Spaces:
Sleeping
Sleeping
""" | |
Metadata Manager Module | |
Handles document metadata storage and retrieval operations. | |
""" | |
import json | |
import asyncio | |
import hashlib | |
from typing import List, Dict, Any | |
from pathlib import Path | |
from config.config import EMBEDDING_MODEL, CHUNK_SIZE, CHUNK_OVERLAP | |
class MetadataManager: | |
"""Handles document metadata operations.""" | |
def __init__(self, base_db_path: Path): | |
""" | |
Initialize the metadata manager. | |
Args: | |
base_db_path: Base path for storing metadata files | |
""" | |
self.base_db_path = base_db_path | |
self.processed_docs_file = self.base_db_path / "processed_documents.json" | |
self.processed_docs = self._load_processed_docs() | |
def _load_processed_docs(self) -> Dict[str, Dict]: | |
"""Load the registry of processed documents.""" | |
if self.processed_docs_file.exists(): | |
try: | |
with open(self.processed_docs_file, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"⚠️ Warning: Could not load processed docs registry: {e}") | |
return {} | |
def _save_processed_docs(self): | |
"""Save the registry of processed documents.""" | |
try: | |
with open(self.processed_docs_file, 'w', encoding='utf-8') as f: | |
json.dump(self.processed_docs, f, indent=2, ensure_ascii=False) | |
except Exception as e: | |
print(f"⚠️ Warning: Could not save processed docs registry: {e}") | |
def generate_doc_id(self, document_url: str) -> str: | |
""" | |
Generate a unique document ID from the URL. | |
Args: | |
document_url: URL of the document | |
Returns: | |
str: Unique document ID | |
""" | |
url_hash = hashlib.md5(document_url.encode()).hexdigest()[:12] | |
return f"doc_{url_hash}" | |
def is_document_processed(self, document_url: str) -> bool: | |
""" | |
Check if a document has already been processed. | |
Args: | |
document_url: URL of the document | |
Returns: | |
bool: True if document is already processed | |
""" | |
doc_id = self.generate_doc_id(document_url) | |
return doc_id in self.processed_docs | |
def get_document_info(self, document_url: str) -> Dict[str, Any]: | |
""" | |
Get information about a processed document. | |
Args: | |
document_url: URL of the document | |
Returns: | |
Dict[str, Any]: Document information or empty dict if not found | |
""" | |
doc_id = self.generate_doc_id(document_url) | |
return self.processed_docs.get(doc_id, {}) | |
def save_document_metadata(self, chunks: List[str], doc_id: str, document_url: str): | |
""" | |
Save document metadata to JSON file and update registry. | |
Args: | |
chunks: List of text chunks | |
doc_id: Document identifier | |
document_url: Original document URL | |
""" | |
# Calculate statistics | |
total_chars = sum(len(chunk) for chunk in chunks) | |
total_words = sum(len(chunk.split()) for chunk in chunks) | |
avg_chunk_size = total_chars / len(chunks) if chunks else 0 | |
# Create metadata object | |
metadata = { | |
"doc_id": doc_id, | |
"document_url": document_url, | |
"chunk_count": len(chunks), | |
"total_chars": total_chars, | |
"total_words": total_words, | |
"avg_chunk_size": avg_chunk_size, | |
"processed_at": asyncio.get_event_loop().time(), | |
"embedding_model": EMBEDDING_MODEL, | |
"chunk_size": CHUNK_SIZE, | |
"chunk_overlap": CHUNK_OVERLAP, | |
"processing_config": { | |
"chunk_size": CHUNK_SIZE, | |
"chunk_overlap": CHUNK_OVERLAP, | |
"embedding_model": EMBEDDING_MODEL | |
} | |
} | |
# Save individual document metadata | |
metadata_path = self.base_db_path / f"{doc_id}_metadata.json" | |
try: | |
with open(metadata_path, "w", encoding="utf-8") as f: | |
json.dump(metadata, f, indent=2, ensure_ascii=False) | |
print(f"✅ Saved individual metadata for {doc_id}") | |
except Exception as e: | |
print(f"⚠️ Warning: Could not save individual metadata for {doc_id}: {e}") | |
# Update processed documents registry | |
self.processed_docs[doc_id] = { | |
"document_url": document_url, | |
"chunk_count": len(chunks), | |
"processed_at": metadata["processed_at"], | |
"collection_name": f"{doc_id}_collection", | |
"total_chars": total_chars, | |
"total_words": total_words | |
} | |
self._save_processed_docs() | |
print(f"✅ Updated registry for document {doc_id}") | |
def get_document_metadata(self, doc_id: str) -> Dict[str, Any]: | |
""" | |
Load individual document metadata from file. | |
Args: | |
doc_id: Document identifier | |
Returns: | |
Dict[str, Any]: Document metadata or empty dict if not found | |
""" | |
metadata_path = self.base_db_path / f"{doc_id}_metadata.json" | |
if not metadata_path.exists(): | |
return {} | |
try: | |
with open(metadata_path, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"⚠️ Warning: Could not load metadata for {doc_id}: {e}") | |
return {} | |
def list_processed_documents(self) -> Dict[str, Dict]: | |
""" | |
List all processed documents. | |
Returns: | |
Dict[str, Dict]: Copy of processed documents registry | |
""" | |
return self.processed_docs.copy() | |
def get_collection_stats(self) -> Dict[str, Any]: | |
""" | |
Get statistics about all collections. | |
Returns: | |
Dict[str, Any]: Collection statistics | |
""" | |
stats = { | |
"total_documents": len(self.processed_docs), | |
"total_collections": 0, | |
"total_chunks": 0, | |
"total_characters": 0, | |
"total_words": 0, | |
"documents": [] | |
} | |
for doc_id, info in self.processed_docs.items(): | |
collection_path = self.base_db_path / f"{info['collection_name']}.db" | |
if collection_path.exists(): | |
stats["total_collections"] += 1 | |
stats["total_chunks"] += info.get("chunk_count", 0) | |
stats["total_characters"] += info.get("total_chars", 0) | |
stats["total_words"] += info.get("total_words", 0) | |
stats["documents"].append({ | |
"doc_id": doc_id, | |
"url": info["document_url"], | |
"chunk_count": info.get("chunk_count", 0), | |
"total_chars": info.get("total_chars", 0), | |
"total_words": info.get("total_words", 0), | |
"processed_at": info.get("processed_at", "unknown") | |
}) | |
# Add averages | |
if stats["total_documents"] > 0: | |
stats["avg_chunks_per_doc"] = stats["total_chunks"] / stats["total_documents"] | |
stats["avg_chars_per_doc"] = stats["total_characters"] / stats["total_documents"] | |
stats["avg_words_per_doc"] = stats["total_words"] / stats["total_documents"] | |
return stats | |
def remove_document_metadata(self, doc_id: str) -> bool: | |
""" | |
Remove document metadata and registry entry. | |
Args: | |
doc_id: Document identifier | |
Returns: | |
bool: True if successfully removed, False otherwise | |
""" | |
try: | |
# Remove individual metadata file | |
metadata_path = self.base_db_path / f"{doc_id}_metadata.json" | |
if metadata_path.exists(): | |
metadata_path.unlink() | |
print(f"🗑️ Removed metadata file for {doc_id}") | |
# Remove from registry | |
if doc_id in self.processed_docs: | |
del self.processed_docs[doc_id] | |
self._save_processed_docs() | |
print(f"🗑️ Removed registry entry for {doc_id}") | |
return True | |
except Exception as e: | |
print(f"❌ Error removing metadata for {doc_id}: {e}") | |
return False | |
def update_document_status(self, doc_id: str, status_info: Dict[str, Any]): | |
""" | |
Update status information for a document. | |
Args: | |
doc_id: Document identifier | |
status_info: Status information to update | |
""" | |
if doc_id in self.processed_docs: | |
self.processed_docs[doc_id].update(status_info) | |
self._save_processed_docs() | |
print(f"✅ Updated status for document {doc_id}") | |
def get_registry_path(self) -> str: | |
""" | |
Get the path to the processed documents registry. | |
Returns: | |
str: Path to registry file | |
""" | |
return str(self.processed_docs_file) | |