Spaces:
Build error
Build error
| import streamlit as st | |
| from typing import Dict, List | |
| from datetime import datetime | |
| import os | |
| class CaseManagerInterface: | |
| def __init__(self, case_manager, vector_store, document_processor): | |
| self.case_manager = case_manager | |
| self.vector_store = vector_store | |
| self.document_processor = document_processor | |
| def render(self): | |
| st.title("π Case Management") | |
| # Tab based interface for case management | |
| tab1, tab2, tab3 = st.tabs(["Case Overview", "Document Management", "Batch Operations"]) | |
| with tab1: | |
| self._render_case_overview() | |
| with tab2: | |
| self._render_document_manager() | |
| with tab3: | |
| self._render_batch_operations() | |
| def _render_case_overview(self): | |
| # Create new case section | |
| st.subheader("Create New Case") | |
| with st.form("create_case_form"): | |
| title = st.text_input("Case Title") | |
| description = st.text_area("Description") | |
| case_type = st.selectbox("Case Type", ["Judgment", "Contract", "MOU", "Will", "Other"]) | |
| create_case = st.form_submit_button("Create Case") | |
| if create_case and title: | |
| try: | |
| self.case_manager.create_case(title, description, case_type) | |
| st.success(f"Case '{title}' created successfully!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error creating case: {str(e)}") | |
| # List existing cases | |
| st.subheader("Existing Cases") | |
| cases = self.case_manager.get_all_cases() | |
| for case in cases: | |
| with st.container(): | |
| col1, col2 = st.columns([4, 1]) | |
| with col1: | |
| st.markdown(f"### {case['title']}") | |
| st.markdown(f"**Type:** {case['case_type']}") | |
| st.markdown(f"**Created:** {case['created_at']}") | |
| st.markdown(f"**Description:** {case['description']}") | |
| # Show document count | |
| docs = self.case_manager.list_documents(case['id']) | |
| st.markdown(f"**Documents:** {len(docs)}") | |
| with col2: | |
| if st.button("ποΈ Delete", key=f"del_case_{case['id']}"): | |
| if st.warning(f"Delete {case['title']}? This will remove all associated documents."): | |
| try: | |
| # Delete all documents from vector store | |
| for doc in docs: | |
| self.vector_store.delete_document(doc['id']) | |
| # Delete case | |
| self.case_manager.delete_case(case['id']) | |
| st.success("Case deleted!") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| st.divider() | |
| def _render_document_manager(self): | |
| """Render document management interface with file preview.""" | |
| # Case selection | |
| cases = self.case_manager.get_all_cases() | |
| if not cases: | |
| st.info("No cases found. Please create a case first.") | |
| return | |
| selected_case = st.selectbox( | |
| "Select Case", | |
| cases, | |
| format_func=lambda x: x['title'] | |
| ) | |
| if selected_case: | |
| documents = self.case_manager.list_documents(selected_case['id']) | |
| # Document upload section | |
| st.subheader("Upload Documents") | |
| uploaded_files = st.file_uploader( | |
| "Upload one or more documents", | |
| accept_multiple_files=True, | |
| key=f"upload_{selected_case['id']}" | |
| ) | |
| if uploaded_files: | |
| for uploaded_file in uploaded_files: | |
| with st.status(f"Processing {uploaded_file.name}..."): | |
| try: | |
| text, chunks, metadata = self.document_processor.process_and_tag_document(uploaded_file) | |
| # Add chunks to vector store | |
| for chunk in chunks: | |
| self.vector_store.add_document( | |
| doc_id=metadata['doc_id'], | |
| text=chunk['text'], | |
| metadata={ | |
| **metadata, | |
| 'chunk_index': chunk.get('chunk_id', 0), | |
| 'total_chunks': len(chunks) | |
| } | |
| ) | |
| # Add document to case | |
| doc_data = { | |
| "id": metadata['doc_id'], | |
| "title": uploaded_file.name, | |
| "text": text, | |
| "metadata": metadata, | |
| "chunks": chunks | |
| } | |
| self.case_manager.add_document(selected_case["id"], doc_data) | |
| st.success(f"Added: {uploaded_file.name}") | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {str(e)}") | |
| # Document list | |
| if documents: | |
| st.subheader("Documents") | |
| # Add tabs for different document views | |
| doc_tab1, doc_tab2 = st.tabs(["List View", "Grid View"]) | |
| with doc_tab1: | |
| for doc in documents: | |
| with st.container(): | |
| # Create a clickable document title that shows preview | |
| if st.markdown(f"[π {doc['title']}](#{doc['id']})", help="Click to preview"): | |
| with st.expander("Document Preview", expanded=True): | |
| # Add tabs for different preview modes | |
| preview_tab1, preview_tab2, preview_tab3 = st.tabs(["Content", "Metadata", "Chunks"]) | |
| with preview_tab1: | |
| st.text_area("Document Content", doc.get('text', 'No content available'), | |
| height=300, key=f"content_{doc['id']}") | |
| with preview_tab2: | |
| st.json(doc.get('metadata', {})) | |
| with preview_tab3: | |
| chunks = doc.get('chunks', []) | |
| for idx, chunk in enumerate(chunks): | |
| st.markdown(f"**Chunk {idx + 1}**") | |
| st.text_area(f"Chunk Content", | |
| chunk.get('text', 'No content available'), | |
| height=100, | |
| key=f"chunk_{doc['id']}_{idx}") | |
| col1, col2, col3 = st.columns([1, 1, 1]) | |
| with col1: | |
| if st.button("π₯ Download", key=f"download_{doc['id']}"): | |
| try: | |
| st.download_button( | |
| "Click to Download", | |
| doc.get('text', ''), | |
| file_name=doc['title'], | |
| mime='text/plain', | |
| key=f"download_button_{doc['id']}" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error downloading: {str(e)}") | |
| with col2: | |
| if st.button("π Reprocess", key=f"reprocess_{doc['id']}"): | |
| try: | |
| # Implement reprocessing logic | |
| st.info("Reprocessing document...") | |
| except Exception as e: | |
| st.error(f"Error reprocessing: {str(e)}") | |
| with col3: | |
| if st.button("ποΈ Delete", key=f"delete_{doc['id']}"): | |
| if st.warning("Are you sure you want to delete this document?"): | |
| try: | |
| self.vector_store.delete_document(doc['id']) | |
| self.case_manager.remove_document(selected_case['id'], doc['id']) | |
| st.success(f"Deleted: {doc['title']}") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error deleting: {str(e)}") | |
| st.divider() | |
| with doc_tab2: | |
| # Grid view of documents | |
| cols = st.columns(3) | |
| for idx, doc in enumerate(documents): | |
| with cols[idx % 3]: | |
| st.markdown(f"### π {doc['title']}") | |
| st.markdown(f"Added: {doc.get('added_at', 'Unknown')}") | |
| st.markdown(f"Size: {len(doc.get('text', ''))}") | |
| if st.button("Preview", key=f"grid_preview_{doc['id']}"): | |
| with st.expander("Document Preview", expanded=True): | |
| st.text_area("Content", doc.get('text', ''), height=200) | |
| else: | |
| st.info("No documents in this case yet.") | |
| def _render_batch_operations(self): | |
| st.subheader("Batch Operations") | |
| # Case selection for batch operations | |
| selected_case = st.selectbox( | |
| "Select Target Case", | |
| self.case_manager.get_all_cases(), | |
| format_func=lambda x: x['title'], | |
| key="batch_case" | |
| ) | |
| if selected_case: | |
| # Batch upload | |
| st.markdown("### π€ Batch Upload") | |
| uploaded_files = st.file_uploader( | |
| "Upload multiple documents", | |
| accept_multiple_files=True, | |
| key="batch_upload" | |
| ) | |
| if uploaded_files: | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| for idx, file in enumerate(uploaded_files): | |
| try: | |
| status_text.text(f"Processing {file.name}...") | |
| text, chunks, metadata = self.document_processor.process_and_tag_document(file) | |
| # Add to vector store and case | |
| for chunk in chunks: | |
| self.vector_store.add_document( | |
| doc_id=metadata['doc_id'], | |
| text=chunk['text'], | |
| metadata=metadata | |
| ) | |
| doc_data = { | |
| "id": metadata['doc_id'], | |
| "title": file.name, | |
| "text": text, | |
| "metadata": metadata, | |
| "chunks": chunks | |
| } | |
| self.case_manager.add_document(selected_case["id"], doc_data) | |
| # Update progress | |
| progress = (idx + 1) / len(uploaded_files) | |
| progress_bar.progress(progress) | |
| except Exception as e: | |
| st.error(f"Error processing {file.name}: {str(e)}") | |
| status_text.text("Batch upload complete!") | |
| st.success(f"Processed {len(uploaded_files)} documents") |