Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| from pathlib import Path | |
| from pdf_parser import PDFParser | |
| from vector_store import VectorStore | |
| from rag_system import VisualMultimodalRAG | |
| from config import UPLOAD_FOLDER, MAX_PDF_SIZE_MB | |
| st.set_page_config( | |
| page_title="π Multimodal RAG LLM System (PDF Parsing)", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| if 'api_key_set' not in st.session_state: | |
| st.session_state.api_key_set = False | |
| if 'api_key' not in st.session_state: | |
| st.session_state.api_key = None | |
| if 'visual_rag_system' not in st.session_state: | |
| st.session_state.visual_rag_system = None | |
| if 'vector_store' not in st.session_state: | |
| st.session_state.vector_store = None | |
| if 'parser' not in st.session_state: | |
| st.session_state.parser = None | |
| if 'current_document' not in st.session_state: | |
| st.session_state.current_document = None | |
| if 'current_text' not in st.session_state: | |
| st.session_state.current_text = None | |
| if 'current_images' not in st.session_state: | |
| st.session_state.current_images = None | |
| if 'current_tables' not in st.session_state: | |
| st.session_state.current_tables = None | |
| if 'processing_results' not in st.session_state: | |
| st.session_state.processing_results = None | |
| if 'answering_rag' not in st.session_state: | |
| st.session_state.answering_rag = None | |
| st.title("π Multimodal RAG LLM System (PDF Parsing)") | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| st.subheader("π OpenAI API Key") | |
| api_key = st.text_input( | |
| "Enter your OpenAI API key:", | |
| type="password", | |
| key="api_key_input" | |
| ) | |
| if api_key: | |
| st.session_state.api_key = api_key | |
| st.session_state.api_key_set = True | |
| if st.session_state.visual_rag_system is None: | |
| try: | |
| st.session_state.visual_rag_system = VisualMultimodalRAG(api_key=api_key, debug=True) | |
| st.session_state.vector_store = VectorStore() | |
| st.session_state.parser = PDFParser(debug=True) | |
| st.success("β API Key set & systems initialized") | |
| except Exception as e: | |
| st.error(f"Error initializing systems: {e}") | |
| else: | |
| st.session_state.api_key_set = False | |
| st.warning("β οΈ Please enter your API key to continue") | |
| st.divider() | |
| st.subheader("π Vector Store Status") | |
| if st.session_state.vector_store: | |
| try: | |
| info = st.session_state.vector_store.get_collection_info() | |
| st.metric("Items in Store", info['count']) | |
| st.metric("Status", info['status']) | |
| st.caption(f"Path: {info['persist_path']}") | |
| except Exception as e: | |
| st.error(f"Error getting store info: {e}") | |
| else: | |
| st.info("Set API key to initialize vector store") | |
| st.divider() | |
| st.subheader("π Document Management") | |
| if st.button("π Clear Vector Store"): | |
| if st.session_state.vector_store: | |
| try: | |
| st.session_state.vector_store.clear_all() | |
| st.success("β Vector store cleared") | |
| except Exception as e: | |
| st.error(f"Error clearing store: {e}") | |
| st.header("π€ Upload PDF Document") | |
| uploaded_file = st.file_uploader( | |
| "Choose a PDF file", | |
| type=['pdf'], | |
| help="PDF with text, images, and tables" | |
| ) | |
| if uploaded_file is not None: | |
| upload_path = Path(UPLOAD_FOLDER) | |
| upload_path.mkdir(exist_ok=True) | |
| file_path = upload_path / uploaded_file.name | |
| with open(file_path, 'wb') as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.success(f"β File saved: {uploaded_file.name}") | |
| if st.button("π Parse PDF"): | |
| if not st.session_state.api_key_set: | |
| st.error("β Please set OpenAI API key first") | |
| else: | |
| try: | |
| with st.spinner("π Parsing PDF..."): | |
| print(f"\n{'='*70}") | |
| print(f"PARSING: {uploaded_file.name}") | |
| print(f"{'='*70}") | |
| # Parse PDF - returns text, images, tables | |
| parser = st.session_state.parser | |
| text, images, tables = parser.parse_pdf(str(file_path)) | |
| # Store in session state | |
| st.session_state.current_document = uploaded_file.name | |
| st.session_state.current_text = text | |
| st.session_state.current_images = images | |
| st.session_state.current_tables = tables | |
| # Display results | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("π Text", f"{len(text):,} chars") | |
| with col2: | |
| st.metric("πΌοΈ Images", len(images)) | |
| with col3: | |
| st.metric("π Tables", len(tables)) | |
| #if images: | |
| # st.subheader("πΌοΈ Extracted Images") | |
| # for idx, img in enumerate(images): | |
| # ocr_text = img.get('ocr_text', '') | |
| # ocr_len = len(ocr_text) | |
| # | |
| # if ocr_len > 0: | |
| # st.success(f"β Image {idx}: {ocr_len} characters (OCR)") | |
| # else: | |
| # st.warning(f"β οΈ Image {idx}: No OCR text (will use visual analysis)") | |
| st.success("β PDF parsing complete!") | |
| except Exception as e: | |
| st.error(f"β Error parsing PDF: {e}") | |
| print(f"Error: {e}") | |
| st.divider() | |
| st.header("πΌοΈ Analysis & Storage") | |
| if st.button("πΌοΈ Analyze & Store Components"): | |
| if not st.session_state.api_key_set: | |
| st.error("β Please set OpenAI API key first") | |
| elif st.session_state.current_text is None: | |
| st.error("β Please parse a PDF document first") | |
| else: | |
| try: | |
| with st.spinner("πΌοΈ Analyzing..."): | |
| print(f"\n{'='*70}") | |
| print(f"VISUAL IMAGE ANALYSIS") | |
| print(f"{'='*70}") | |
| visual_rag = st.session_state.visual_rag_system | |
| vector_store = st.session_state.vector_store | |
| results = visual_rag.process_and_store_document( | |
| text=st.session_state.current_text, | |
| images=st.session_state.current_images, | |
| tables=st.session_state.current_tables, | |
| vector_store=vector_store, | |
| doc_id=st.session_state.current_document or "current_doc" | |
| ) | |
| st.session_state.processing_results = results | |
| st.success("β Visual analysis complete & stored!") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("πΌοΈ Images Analyzed", len(results['image_visual_analyses'])) | |
| with col2: | |
| st.metric("π Text Chunks", len(results['text_summaries'])) | |
| with col3: | |
| st.metric("π Tables Analyzed", len(results['table_summaries'])) | |
| st.metric("π Total Stored in Vector", results['total_stored']) | |
| #if results['image_visual_analyses']: | |
| # st.subheader("πΌοΈ Visual Image Analyses (gpt-4o)") | |
| # for img_analysis in results['image_visual_analyses']: | |
| # with st.expander(f"Image {img_analysis['image_index']} - Visual Analysis"): | |
| # st.write("**Visual Analysis by gpt-4o:**") | |
| # st.write(img_analysis['visual_analysis']) | |
| # | |
| # st.write("**Image Path:**") | |
| # st.code(img_analysis['image_path']) | |
| # | |
| # if img_analysis['ocr_text']: | |
| # st.write("**OCR Text (backup):**") | |
| # st.text(img_analysis['ocr_text'][:500]) | |
| #if results['text_summaries']: | |
| # st.subheader("π Text Chunk Summaries") | |
| # for chunk_summary in results['text_summaries']: | |
| # with st.expander( | |
| # f"Chunk {chunk_summary['chunk_index']} " | |
| # f"({chunk_summary['chunk_length']} chars)" | |
| # ): | |
| # st.write("**Summary:**") | |
| # st.write(chunk_summary['summary']) | |
| # st.write("**Original Text (first 500 chars):**") | |
| # st.text(chunk_summary['original_text']) | |
| #if results['table_summaries']: | |
| # st.subheader("π Table Analyses") | |
| # for table_summary in results['table_summaries']: | |
| # with st.expander( | |
| # f"Table {table_summary['table_index']} " | |
| # f"({table_summary['table_length']} chars)" | |
| # ): | |
| # st.write("**Analysis:**") | |
| # st.write(table_summary['summary']) | |
| # st.write("**Original Content (first 500 chars):**") | |
| # st.text(table_summary['original_content']) | |
| print(f"\nβ Analysis processing complete!") | |
| except Exception as e: | |
| st.error(f"β Error during analysis: {e}") | |
| print(f"Error: {e}") | |
| st.divider() | |
| st.header("β Ask Questions About Document") | |
| if 'answering_rag' not in st.session_state: | |
| st.session_state.answering_rag = None | |
| if st.session_state.api_key_set and st.session_state.answering_rag is None: | |
| from rag_system import AnsweringRAG | |
| st.session_state.answering_rag = AnsweringRAG(api_key=st.session_state.api_key, debug=True) | |
| question = st.text_area( | |
| "Enter your question:", | |
| height=100, | |
| placeholder="What does the document say about...?" | |
| ) | |
| if st.button("π Search & Generate Answer"): | |
| if not st.session_state.api_key_set: | |
| st.error("β Please set OpenAI API key first") | |
| elif st.session_state.current_text is None: | |
| st.error("β Please parse a PDF document first") | |
| elif not question: | |
| st.error("β Please enter a question") | |
| else: | |
| try: | |
| with st.spinner("π Searching document and analyzing..."): | |
| print(f"\n{'='*70}") | |
| print(f"QUESTION: {question}") | |
| print(f"{'='*70}") | |
| store = st.session_state.vector_store | |
| doc_name = st.session_state.current_document or "current_doc" | |
| doc_data = { | |
| 'text': st.session_state.current_text, | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| store.add_documents(doc_data, doc_name) | |
| search_results = store.search(question, n_results=5) | |
| print(f"\nπ Search Results Found: {len(search_results)}") | |
| answering_rag = st.session_state.answering_rag | |
| result = answering_rag.analyze_and_answer(question, search_results) | |
| st.success("β Analysis complete!") | |
| st.subheader("π Answer") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Confidence", f"{result['confidence'].upper()}") | |
| with col2: | |
| st.metric("Sources Used", result['sources_used']) | |
| with col3: | |
| if result['sources_used'] > 0: | |
| st.metric("Avg Relevance", f"{sum(1-r.get('distance',0) for r in search_results)/len(search_results):.0%}") | |
| st.write(result['answer']) | |
| if st.checkbox("π Show Source Documents"): | |
| st.subheader("Sources Used in Answer") | |
| for idx, source in enumerate(result['formatted_sources'], 1): | |
| relevance = source['relevance'] | |
| relevance_bar = "β" * int(relevance * 10) + "β" * (10 - int(relevance * 10)) | |
| with st.expander( | |
| f"Source {idx} - {source['type'].upper()} " | |
| f"[{relevance_bar}] {relevance:.0%}" | |
| ): | |
| st.write(source['content']) | |
| print(f"\nβ Answer generation complete!") | |
| except Exception as e: | |
| st.error(f"β Error processing question: {e}") | |
| print(f"Error: {e}") | |
| st.divider() |