Spaces:
Sleeping
Sleeping
| from typing import List, Dict | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| import base64 | |
| import os | |
| from pathlib import Path | |
| from config import ( | |
| OPENAI_API_KEY, OPENAI_MODEL, TEMPERATURE, MAX_TOKENS, | |
| LANGUAGE, CHROMA_DB_PATH | |
| ) | |
| class VisualMultimodalRAG: | |
| def __init__(self, api_key: str = None, debug: bool = True): | |
| api_key = api_key or OPENAI_API_KEY | |
| self.debug = debug | |
| self.llm = ChatOpenAI( | |
| model_name="gpt-4o-mini", | |
| api_key=api_key, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| self.language = LANGUAGE | |
| self.visual_summaries_log = [] | |
| if self.debug: | |
| print("โ VisualMultimodalRAG initialized") | |
| def _debug_print(self, label: str, data: any): | |
| if self.debug: | |
| print(f"\n๐ DEBUG [{label}]:") | |
| if isinstance(data, (list, dict)): | |
| print(f" Type: {type(data).__name__}") | |
| print(f" Content: {str(data)[:300]}...") | |
| else: | |
| print(f" {data}") | |
| def _image_to_base64(self, image_path: str) -> str: | |
| try: | |
| with open(image_path, 'rb') as image_file: | |
| image_data = base64.b64encode(image_file.read()).decode('utf-8') | |
| return image_data | |
| except Exception as e: | |
| print(f"Error converting image to base64: {e}") | |
| return None | |
| def analyze_image_visually(self, image_path: str, image_idx: int) -> str: | |
| if not os.path.exists(image_path): | |
| return f"[Image {image_idx}: File not found - {image_path}]" | |
| try: | |
| image_base64 = self._image_to_base64(image_path) | |
| if not image_base64: | |
| return f"[Image {image_idx}: Could not convert to base64]" | |
| file_ext = Path(image_path).suffix.lower() | |
| media_type_map = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp' | |
| } | |
| media_type = media_type_map.get(file_ext, 'image/png') | |
| print(f"๐ Analyzing image {image_idx} visually (as {media_type})...") | |
| # Create message with image | |
| message = HumanMessage( | |
| content=[ | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:{media_type};base64,{image_base64}", | |
| }, | |
| }, | |
| { | |
| "type": "text", | |
| "text": f"""You are assistant for analyzing and aggregating information. Analyze this image. | |
| Provide a visual analysis that includes: | |
| 1. Main objects and element | |
| 2. Data/Content - Any numbers, text, charts, graphs | |
| 3. What this image is showing or representing | |
| 4. Important patterns, trends, or information | |
| 5. How image relates to document content | |
| Be brief and meaningful. Focus on visual information that cannot be extracted from text. Response on {self.language}. | |
| Analysis:""" | |
| } | |
| ], | |
| ) | |
| response = self.llm.invoke([message]) | |
| analysis = response.content.strip() | |
| if self.debug: | |
| self._debug_print(f"Image {image_idx} Visual Analysis", analysis) | |
| print(f"โ Image {image_idx} analyzed successfully") | |
| return analysis | |
| except Exception as e: | |
| error_msg = f"[Image {image_idx}: Vision analysis failed - {str(e)}]" | |
| print(f"โ Error analyzing image {image_idx}: {e}") | |
| return error_msg | |
| def analyze_images_visually(self, images: List[Dict]) -> List[Dict]: | |
| visual_analyses = [] | |
| for idx, image in enumerate(images): | |
| image_path = image.get('path', '') | |
| if not image_path: | |
| print(f"โ ๏ธ Image {idx}: No path provided") | |
| continue | |
| visual_analysis = self.analyze_image_visually(image_path, idx) | |
| visual_analyses.append({ | |
| 'type': 'image_visual', | |
| 'image_index': idx, | |
| 'image_path': image_path, | |
| 'visual_analysis': visual_analysis, | |
| 'ocr_text': image.get('ocr_text', '') # Keep OCR as backup | |
| }) | |
| return visual_analyses | |
| def summarize_text_chunks(self, text: str, chunk_size: int = 1500) -> List[Dict]: | |
| chunks = [] | |
| text_chunks = self._chunk_text(text, chunk_size=chunk_size, overlap=300) | |
| self._debug_print("Text Chunking", f"Created {len(text_chunks)} chunks") | |
| for idx, chunk in enumerate(text_chunks): | |
| if len(chunk.strip()) < 50: | |
| continue | |
| try: | |
| prompt = f"""Summarize this text chunk in {self.language}. | |
| Be brief and meaningful. Extract key points, facts, and main ideas. | |
| Text Chunk: | |
| {chunk} | |
| Summary:""" | |
| message = HumanMessage(content=prompt) | |
| response = self.llm.invoke([message]) | |
| summary = response.content.strip() | |
| chunks.append({ | |
| 'type': 'text_chunk', | |
| 'chunk_index': len(chunks), | |
| 'original_text': chunk[:500], | |
| 'summary': summary, | |
| 'chunk_length': len(chunk) | |
| }) | |
| if self.debug: | |
| self._debug_print(f"Text Chunk {len(chunks)-1} Summary", summary) | |
| except Exception as e: | |
| print(f"Error summarizing text chunk: {e}") | |
| return chunks | |
| def summarize_tables(self, tables: List[Dict]) -> List[Dict]: | |
| summaries = [] | |
| for idx, table in enumerate(tables): | |
| table_content = table.get('content', '') | |
| if not table_content or len(table_content.strip()) < 10: | |
| continue | |
| try: | |
| prompt = f"""Analyze and summarize this table/structured data in {self.language}. | |
| Extract key insights, row/column meanings, and important figures. Be brief and meaningful. | |
| Table Content: | |
| {table_content} | |
| Summary:""" | |
| message = HumanMessage(content=prompt) | |
| response = self.llm.invoke([message]) | |
| summary = response.content.strip() | |
| summaries.append({ | |
| 'type': 'table', | |
| 'table_index': idx, | |
| 'original_content': table_content[:500], | |
| 'summary': summary, | |
| 'table_length': len(table_content) | |
| }) | |
| if self.debug: | |
| self._debug_print(f"Table {idx} Summary", summary) | |
| except Exception as e: | |
| print(f"Error summarizing table {idx}: {e}") | |
| return summaries | |
| def process_and_store_document( | |
| self, | |
| text: str, | |
| images: List[Dict], | |
| tables: List[Dict], | |
| vector_store, | |
| doc_id: str | |
| ) -> Dict: | |
| print(f"\n{'='*70}") | |
| print(f"PROCESSING WITH VISUAL IMAGE ANALYSIS: {doc_id}") | |
| print(f"{'='*70}") | |
| results = { | |
| 'doc_id': doc_id, | |
| 'image_visual_analyses': [], | |
| 'text_summaries': [], | |
| 'table_summaries': [], | |
| 'total_stored': 0 | |
| } | |
| print(f"\n๐ผ๏ธ VISUAL IMAGE ANALYSIS (gpt-4o vision) ({len(images)} total)") | |
| print(f"{'โ'*70}") | |
| image_analyses = self.analyze_images_visually(images) | |
| results['image_visual_analyses'] = image_analyses | |
| image_docs = { | |
| 'text': ' | '.join([ | |
| f"Image {a['image_index']}: {a['visual_analysis']}" | |
| for a in image_analyses | |
| ]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for analysis in image_analyses: | |
| print(f" โ Image {analysis['image_index']} (visual analysis)") | |
| print(f" Path: {analysis['image_path']}") | |
| print(f" Analysis: {analysis['visual_analysis'][:100]}...") | |
| if image_analyses: | |
| try: | |
| vector_store.add_documents( | |
| image_docs, | |
| f"{doc_id}_images_visual" | |
| ) | |
| results['total_stored'] += len(image_analyses) | |
| print(f"โ Stored {len(image_analyses)} image visual analyses") | |
| except Exception as e: | |
| print(f"โ Error storing image analyses: {e}") | |
| print(f"\n๐ TEXT CHUNK SUMMARIZATION") | |
| print(f"{'โ'*70}") | |
| text_summaries = self.summarize_text_chunks(text) | |
| results['text_summaries'] = text_summaries | |
| text_docs = { | |
| 'text': ' | '.join([f"Chunk {s['chunk_index']}: {s['summary']}" | |
| for s in text_summaries]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for summary in text_summaries: | |
| print(f" โ Chunk {summary['chunk_index']}: {summary['summary'][:50]}...") | |
| if text_summaries: | |
| try: | |
| vector_store.add_documents( | |
| text_docs, | |
| f"{doc_id}_text_chunks" | |
| ) | |
| results['total_stored'] += len(text_summaries) | |
| print(f"โ Stored {len(text_summaries)} text chunk summaries") | |
| except Exception as e: | |
| print(f"โ Error storing text summaries: {e}") | |
| print(f"\n๐ TABLE SUMMARIZATION ({len(tables)} total)") | |
| print(f"{'โ'*70}") | |
| table_summaries = self.summarize_tables(tables) | |
| results['table_summaries'] = table_summaries | |
| table_docs = { | |
| 'text': ' | '.join([f"Table {s['table_index']}: {s['summary']}" | |
| for s in table_summaries]), | |
| 'images': [], | |
| 'tables': [] | |
| } | |
| for summary in table_summaries: | |
| print(f" โ Table {summary['table_index']}: {summary['summary'][:50]}...") | |
| if table_summaries: | |
| try: | |
| vector_store.add_documents( | |
| table_docs, | |
| f"{doc_id}_tables" | |
| ) | |
| results['total_stored'] += len(table_summaries) | |
| print(f"โ Stored {len(table_summaries)} table summaries") | |
| except Exception as e: | |
| print(f"โ Error storing table summaries: {e}") | |
| print(f"\n{'='*70}") | |
| print(f"๐ STORAGE SUMMARY") | |
| print(f"{'='*70}") | |
| print(f" Images analyzed visually & stored: {len(image_analyses)}") | |
| print(f" Text chunks summarized & stored: {len(text_summaries)}") | |
| print(f" Tables summarized & stored: {len(table_summaries)}") | |
| print(f" Total items stored in vector: {results['total_stored']}") | |
| print(f"{'='*70}") | |
| self.visual_summaries_log.append(results) | |
| return results | |
| def _chunk_text(self, text: str, chunk_size: int = 1500, overlap: int = 300) -> List[str]: | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunks.append(text[start:end]) | |
| start = end - overlap | |
| return chunks | |
| def get_visual_summaries_log(self) -> List[Dict]: | |
| return self.visual_summaries_log | |
| class AnsweringRAG: | |
| def __init__(self, api_key: str = None, debug: bool = True): | |
| api_key = api_key or OPENAI_API_KEY | |
| self.debug = debug | |
| self.llm = ChatOpenAI( | |
| model_name="gpt-4o-mini", | |
| api_key=api_key, | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| ) | |
| self.language = LANGUAGE | |
| self.answer_log = [] | |
| if self.debug: | |
| print("โ AnsweringRAG initialized") | |
| def _debug_print(self, label: str, data: any): | |
| if self.debug: | |
| print(f"\n๐ DEBUG [{label}]:") | |
| if isinstance(data, (list, dict)): | |
| print(f" Type: {type(data).__name__}") | |
| print(f" Content: {str(data)[:300]}...") | |
| else: | |
| print(f" {data}") | |
| def analyze_and_answer( | |
| self, | |
| question: str, | |
| search_results: List[Dict] | |
| ) -> Dict: | |
| print(f"\n{'='*70}") | |
| print(f"ANALYZING QUESTION & GENERATING ANSWER") | |
| print(f"{'='*70}") | |
| print(f"\nโ Question: {question}") | |
| print(f"๐ Search Results Found: {len(search_results)}") | |
| if not search_results: | |
| print(f"โ ๏ธ No search results found!") | |
| answer = f"""No relevant information in the document to answer question: "{question}" | |
| """ | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': 0, | |
| 'confidence': 'low', | |
| 'search_results': [] | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| context_parts = [] | |
| for idx, result in enumerate(search_results, 1): | |
| content = result.get('content', '') | |
| metadata = result.get('metadata', {}) | |
| content_type = result.get('type', 'unknown') | |
| distance = result.get('distance', 0) | |
| relevance = 1 - distance if distance else 0 | |
| context_parts.append(f""" | |
| [Source {idx} - {content_type.upper()} (relevance: {relevance:.1%})] | |
| {content}""") | |
| full_context = "\n".join(context_parts) | |
| self._debug_print("Context Prepared", f"{len(context_parts)} sources, {len(full_context)} chars") | |
| analysis_prompt = f"""You are a helpful assistant analyzing document content to answer user questions. | |
| USER QUESTION: | |
| {question} | |
| RELEVANT CONTENT FROM DOCUMENT: | |
| {full_context} | |
| INSTRUCTIONS: | |
| 1. Analyze the provided content carefully | |
| 2. Extract information relevant to the question | |
| 3. Synthesize a clear, comprehensive answer in {self.language} | |
| 4. If the content doesn't fully answer the question, explain what information is available | |
| 5. Be specific and cite the content when relevant | |
| 6. Structure your answer clearly with key points | |
| ANSWER:""" | |
| print(f"\n๐ Analyzing search results...") | |
| print(f" Context size: {len(full_context)} characters") | |
| print(f" Sources: {len(search_results)}") | |
| try: | |
| message = HumanMessage(content=analysis_prompt) | |
| response = self.llm.invoke([message]) | |
| answer = response.content.strip() | |
| confidence = self._estimate_confidence(len(search_results), answer) | |
| print(f"โ Answer generated successfully") | |
| print(f" Confidence: {confidence}") | |
| print(f" Answer length: {len(answer)} characters") | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': len(search_results), | |
| 'confidence': confidence, | |
| 'search_results': search_results | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| except Exception as e: | |
| print(f"โ Error generating answer: {e}") | |
| answer = f"I encountered an error while analyzing the search results. Please try again." | |
| result = { | |
| 'question': question, | |
| 'answer': answer, | |
| 'sources_used': len(search_results), | |
| 'confidence': 'low', | |
| 'error': str(e), | |
| 'search_results': search_results | |
| } | |
| self.answer_log.append(result) | |
| return result | |
| def _estimate_confidence(self, sources_count: int, answer: str) -> str: | |
| answer_length = len(answer) | |
| if sources_count >= 3 and answer_length > 500: | |
| return "high" | |
| elif sources_count >= 2 and answer_length > 200: | |
| return "medium" | |
| else: | |
| return "low" | |
| def get_answer_with_sources( | |
| self, | |
| question: str, | |
| search_results: List[Dict] | |
| ) -> Dict: | |
| result = self.analyze_and_answer(question, search_results) | |
| formatted_sources = [] | |
| for idx, source in enumerate(result['search_results'], 1): | |
| formatted_sources.append({ | |
| 'index': idx, | |
| 'type': source.get('type', 'unknown'), | |
| 'content': source.get('content', ''), | |
| 'relevance': 1 - source.get('distance', 0) if source.get('distance') else 0 | |
| }) | |
| result['formatted_sources'] = formatted_sources | |
| return result | |
| def get_answer_log(self) -> List[Dict]: | |
| return self.answer_log | |
| def print_answer_with_sources(self, result: Dict, max_source_length: int = 300): | |
| print(f"\n{'='*70}") | |
| print(f"ANSWER TO: {result['question']}") | |
| print(f"{'='*70}") | |
| print(f"\n๐ ANSWER (Confidence: {result['confidence'].upper()}):") | |
| print(f"{'-'*70}") | |
| print(result['answer']) | |
| print(f"{'-'*70}") | |
| if result.get('formatted_sources'): | |
| print(f"\n๐ SOURCES USED ({len(result['formatted_sources'])} total):") | |
| for source in result['formatted_sources']: | |
| print(f"\n[Source {source['index']} - {source['type'].upper()} ({source['relevance']:.0%} relevant)]") | |
| print(f"{source['content'][:max_source_length]}...") | |
| print(f"\n{'='*70}") |