ChatWithDoc / task_manager.py
NeelTA's picture
initial commit
d2fe6cc
from pdfHandler import PDFProcessor
from docHandler import DocProcessor
from txtHandler import TextProcessor
from webHandler import WebProcessor
from typing import Dict, Any, List
class DocumentManager:
def __init__(self):
self.pdf_processor = PDFProcessor()
self.doc_processor = DocProcessor()
self.txt_processor = TextProcessor()
self.web_processor = WebProcessor()
# Store multiple processed documents
self.processed_documents = [] # List of {"processor": processor, "file_path": path, "content_type": type}
self.all_content = "" # Combined content for multi-document queries
def process_document(self, file_path: str, content_type: str) -> Dict[str, Any]:
try:
result = {"status": "error", "message": "Unknown file type"}
processor = None
print(f"Processing file: {file_path} with content type: {content_type}")
if content_type == "application/pdf":
result = self.pdf_processor.process_pdf(file_path)
processor = self.pdf_processor
elif content_type == "application/msword":
result = self.doc_processor.process_docx(file_path)
processor = self.doc_processor
elif content_type == "text/plain":
result = self.txt_processor.process_text(file_path)
processor = self.txt_processor
elif content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
result = self.doc_processor.process_docx(file_path)
processor = self.doc_processor
if result["status"] == "success" and processor:
# Add to processed documents list
doc_info = {
"processor": processor,
"file_path": file_path,
"content_type": content_type,
"filename": file_path.split('/')[-1] # Extract filename
}
self.processed_documents.append(doc_info)
# Update combined content for multi-document queries
# Assuming processors have a method to get content
try:
if hasattr(processor, 'get_content'):
content = processor.get_content()
self.all_content += f"\n\n--- Document: {doc_info['filename']} ---\n{content}"
except:
pass
print(f"Document added to collection. Total documents: {len(self.processed_documents)}")
return result
except Exception as e:
return {"status": "error", "message": str(e)}
def query_document(self, query: str) -> Dict[str, Any]:
if not self.processed_documents:
return {"status": "error", "message": "No documents processed"}
print(f"Querying {len(self.processed_documents)} documents with question: {query}")
try:
# Strategy 1: Try to query each document and combine results
all_responses = []
for i, doc_info in enumerate(self.processed_documents):
processor = doc_info["processor"]
filename = doc_info["filename"]
just_filename = filename.split('\\')[-1]
# Query individual document
try:
response = processor.query_response(query)
if response.get("status") == "success":
answer = response.get("answer", "")
if answer and answer.strip():
all_responses.append(f"From {just_filename}:\n {answer}")
except Exception as e:
print(f"Error querying {filename}: {e}")
continue
if not all_responses:
return {"status": "error", "message": "No relevant information found in any documents"}
# Combine all responses
combined_answer = "\n\n".join(all_responses)
return {
"status": "success",
"answer": combined_answer
}
except Exception as e:
# Fallback: Use the last processed document
print(f"Multi-document query failed, using last document: {e}")
last_processor = self.processed_documents[-1]["processor"]
return last_processor.query_response(query)
def clear_documents(self):
"""Clear all previously processed documents"""
self.processed_documents = []
self.all_content = ""
print("All documents cleared - ready for new uploads")
def process_url(self, url: str) -> Dict[str, Any]:
"""Process a URL and add it to the document collection"""
try:
result = self.web_processor.process_url(url)
if result["status"] == "success":
# Add URL to processed documents
doc_info = {
"processor": self.web_processor,
"file_path": url,
"content_type": "text/html",
"filename": f"webpage_{url.split('/')[-1] or 'index'}"
}
self.processed_documents.append(doc_info)
# Update combined content
try:
if hasattr(self.web_processor, 'get_content'):
content = self.web_processor.get_content()
self.all_content += f"\n\n--- Web Page: {url} ---\n{content}"
except:
pass
print(f"URL processed and added to collection: {url}")
return result
except Exception as e:
return {"status": "error", "message": str(e)}
def get_status(self) -> Dict[str, Any]:
"""Get current status of processed documents"""
return {
"total_documents": len(self.processed_documents),
"document_types": list(set([doc["content_type"] for doc in self.processed_documents])),
"filenames": [doc["filename"] for doc in self.processed_documents]
}