Spaces:
Running
Running
from dotenv import load_dotenv | |
load_dotenv() | |
import os | |
import chainlit as cl | |
from llama_index.core import Settings | |
from llama_index.core import VectorStoreIndex, StorageContext | |
from llama_index.core.node_parser import SentenceSplitter | |
from llama_index.vector_stores.milvus import MilvusVectorStore | |
from llama_index.embeddings.nvidia import NVIDIAEmbedding | |
from llama_index.llms.nvidia import NVIDIA | |
from document_processor import load_multimodal_data, load_data_from_directory | |
from utils import set_environment_variables | |
import tempfile | |
from typing import List | |
from PIL import Image | |
import io | |
# Initialize settings | |
def initialize_setting(): | |
Settings.embed_model = NVIDIAEmbedding(model="nvidia/nv-embedqa-e5-v5", truncate="END") | |
Settings.llm = NVIDIA(model="meta/llama-3.1-70b-instruct") | |
Settings.text_splitter = SentenceSplitter(chunk_size=600) | |
# Create index from documents | |
def create_index(documents): | |
vector_store = MilvusVectorStore( | |
token="db_341eca982e73331:Dr8+SXGsfb3Kp4/8", | |
host = "https://in03-341eca982e73331.serverless.gcp-us-west1.cloud.zilliz.com", | |
port = 19530, | |
dim = 1024 | |
) | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
return VectorStoreIndex.from_documents(documents, storage_context=storage_context) | |
async def process_uploaded_files(files: List[cl.File]) -> List[str]: | |
"""Process uploaded files and return paths to processed files.""" | |
temp_dir = tempfile.mkdtemp() | |
processed_paths = [] | |
print("\n=== Starting File Processing ===") | |
print(f"Number of files received: {len(files)}") | |
print(f"Temporary directory: {temp_dir}") | |
for file in files: | |
try: | |
print(f"\n--- Processing file ---") | |
print(f"File object type: {type(file)}") | |
print(f"File attributes: {dir(file)}") | |
# Handle string paths (direct file paths) | |
if isinstance(file, str): | |
print("Processing as string path") | |
if os.path.exists(file): | |
file_name = os.path.basename(file) | |
file_extension = os.path.splitext(file_name)[1].lower() | |
temp_path = os.path.join(temp_dir, file_name) | |
print(f"File exists at path: {file}") | |
print(f"File name: {file_name}") | |
print(f"File extension: {file_extension}") | |
print(f"Temp path: {temp_path}") | |
# Copy the file | |
import shutil | |
shutil.copy2(file, temp_path) | |
if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: | |
await cl.Message( | |
content=f"πΈ Received image: {file_name}", | |
elements=[cl.Image(path=file, name=file_name, display="inline")] | |
).send() | |
else: | |
await cl.Message( | |
content=f"π Received file: {file_name}" | |
).send() | |
processed_paths.append(temp_path) | |
print("File processed successfully as string path") | |
else: | |
print(f"File path does not exist: {file}") | |
continue | |
# Handle Chainlit File objects | |
print("Processing as Chainlit File object") | |
file_extension = os.path.splitext(file.name)[1].lower() | |
temp_path = os.path.join(temp_dir, file.name) | |
print(f"File name: {file.name}") | |
print(f"File extension: {file_extension}") | |
print(f"Temp path: {temp_path}") | |
# Handle files with direct path (Chainlit Image objects) | |
if hasattr(file, 'path') and os.path.exists(file.path): | |
print(f"File has path attribute: {file.path}") | |
# For Chainlit Image objects, copy the file | |
import shutil | |
shutil.copy2(file.path, temp_path) | |
print("File copied successfully") | |
if file_extension in ['.png', '.jpg', '.jpeg', '.gif', '.bmp']: | |
print("Processing as image file") | |
await cl.Message( | |
content=f"πΈ Received image: {file.name}", | |
elements=[cl.Image(path=file.path, name=file.name, display="inline")] | |
).send() | |
else: | |
print("Processing as non-image file") | |
await cl.Message( | |
content=f"π Received file: {file.name}" | |
).send() | |
else: | |
print("Attempting to process file content") | |
# For other file types, try to get content | |
file_content = file.content if hasattr(file, 'content') else None | |
if not file_content: | |
print("No file content available") | |
await cl.Message( | |
content=f"β οΈ Warning: Could not access content for {file.name}" | |
).send() | |
continue | |
print("Writing file content to temp path") | |
with open(temp_path, 'wb') as f: | |
f.write(file_content) | |
await cl.Message( | |
content=f"π Received file: {file.name}" | |
).send() | |
processed_paths.append(temp_path) | |
print("File processed successfully") | |
except Exception as e: | |
print(f"Error processing file: {str(e)}") | |
print(f"Error type: {type(e)}") | |
import traceback | |
print(f"Traceback: {traceback.format_exc()}") | |
await cl.Message( | |
content=f"β Error processing file: {str(e)}" | |
).send() | |
continue | |
print("\n=== File Processing Summary ===") | |
print(f"Total files processed: {len(processed_paths)}") | |
print(f"Processed paths: {processed_paths}") | |
return processed_paths | |
async def start(): | |
"""Initialize the chat session.""" | |
set_environment_variables() | |
initialize_setting() | |
# Initialize session variables | |
cl.user_session.set('index', None) | |
cl.user_session.set('temp_dir', tempfile.mkdtemp()) | |
# Send welcome message | |
await cl.Message( | |
content="π Welcome! You can:\n" | |
"1. Upload images or documents using the paperclip icon\n" | |
"2. Ask questions about the uploaded content\n" | |
"3. Get detailed analysis including text extraction and scene descriptions" | |
).send() | |
async def main(message: cl.Message): | |
"""Handle incoming messages and files.""" | |
print("\n=== Starting Message Processing ===") | |
print(f"Message type: {type(message)}") | |
print(f"Message content: {message.content}") | |
print(f"Message elements: {message.elements}") | |
print(f"Message attributes: {dir(message)}") | |
# Process any uploaded files | |
if message.elements: | |
print("\n--- Processing File Upload ---") | |
print(f"Number of elements: {len(message.elements)}") | |
print(f"Elements types: {[type(elem) for elem in message.elements]}") | |
try: | |
# Process uploaded files | |
print("Starting file processing...") | |
processed_paths = await process_uploaded_files(message.elements) | |
print(f"Processed paths: {processed_paths}") | |
if processed_paths: | |
print("\n--- Creating Documents ---") | |
# Create documents from the processed files | |
documents = load_multimodal_data(processed_paths) | |
print(f"Number of documents created: {len(documents) if documents else 0}") | |
if documents: | |
print("\n--- Creating Index ---") | |
# Create or update the index | |
index = create_index(documents) | |
cl.user_session.set('index', index) | |
print("Index created and stored in session") | |
await cl.Message( | |
content="β Files processed successfully! You can now ask questions about the content." | |
).send() | |
else: | |
print("No documents were created") | |
await cl.Message( | |
content="β οΈ No documents were created from the uploaded files." | |
).send() | |
return | |
else: | |
print("No files were processed successfully") | |
await cl.Message( | |
content="β οΈ No files were successfully processed." | |
).send() | |
return | |
except Exception as e: | |
print(f"\n!!! Error in file processing !!!") | |
print(f"Error type: {type(e)}") | |
print(f"Error message: {str(e)}") | |
import traceback | |
print(f"Traceback: {traceback.format_exc()}") | |
await cl.Message( | |
content=f"β Error processing files: {str(e)}" | |
).send() | |
return | |
# Handle text queries | |
if message.content: | |
print("\n--- Processing Text Query ---") | |
print(f"Query content: {message.content}") | |
index = cl.user_session.get('index') | |
print(f"Index exists: {index is not None}") | |
if index is None: | |
print("No index found in session") | |
await cl.Message( | |
content="β οΈ Please upload some files first before asking questions." | |
).send() | |
return | |
try: | |
print("Creating query engine...") | |
# Create message placeholder for streaming | |
msg = cl.Message(content="") | |
await msg.send() | |
# Process the query | |
query_engine = index.as_query_engine(similarity_top_k=20) | |
print("Executing query...") | |
response = query_engine.query(message.content) | |
print("Query executed successfully") | |
# Format the response | |
response_text = str(response) | |
# Check for special queries | |
special_keywords = ["what do you see", "describe the image", "what's in the image", | |
"analyze the image", "text in image", "extract text"] | |
if any(keyword in message.content.lower() for keyword in special_keywords): | |
print("Processing special image analysis query") | |
response_text += "\n\n**Image Analysis:**\n" | |
response_text += "- Visible Text: [Extracted text from the image]\n" | |
response_text += "- Scene Description: [Description of the image content]\n" | |
response_text += "- Objects Detected: [List of detected objects]\n" | |
print("Updating response message...") | |
# Update the message with the final response | |
msg.content=response_text | |
await msg.update() | |
print("Response sent successfully") | |
except Exception as e: | |
print(f"\n!!! Error in query processing !!!") | |
print(f"Error type: {type(e)}") | |
print(f"Error message: {str(e)}") | |
import traceback | |
print(f"Traceback: {traceback.format_exc()}") | |
await cl.Message( | |
content=f"β Error processing query: {str(e)}" | |
).send() | |
print("\n=== Message Processing Complete ===\n") | |
def on_stop(): | |
"""Clean up resources when the chat session ends.""" | |
# Clean up temporary directory | |
temp_dir = cl.user_session.get('temp_dir') | |
if temp_dir and os.path.exists(temp_dir): | |
try: | |
import shutil | |
shutil.rmtree(temp_dir) | |
except Exception: | |
pass | |
if __name__ == "__main__": | |
cl.run() |