import os import re import logging from typing import List, Dict, Tuple import chromadb from chromadb.utils import embedding_functions from config import EMBEDDING_MODEL, DATABASE_DIR # Improved logging configuration logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class KodeksProcessor: def __init__(self): logger.info(f"Initializing database client in directory: {DATABASE_DIR}") if not os.path.exists(DATABASE_DIR): os.makedirs(DATABASE_DIR) logger.info(f"Created directory {DATABASE_DIR}") self.client = chromadb.PersistentClient(path=DATABASE_DIR) logger.info("Database client initialized") try: self.collection = self.client.get_or_create_collection( name="kodeksy", embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction( model_name=EMBEDDING_MODEL ) ) logger.info("Collection 'kodeksy' retrieved or created") except Exception as e: logger.error(f"Error while getting or creating collection: {e}") raise def extract_metadata(self, text: str) -> Dict: metadata = {} # ... (rest of the method remains the same) logger.info("Extracted metadata: %s", metadata) return metadata def split_header_and_content(self, text: str) -> Tuple[str, str]: # ... (method remains the same) def process_article(self, article_text: str) -> Dict: # ... (method remains the same) def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]: chunks = [] articles = re.split(r'(Art\.\s*\d+[a-z]?)', text) for i in range(1, len(articles), 2): article_title = articles[i].strip() article_content = articles[i + 1].strip() if i + 1 < len(articles) else "" processed_article = self.process_article(article_title + " " + article_content) chunk_metadata = { **metadata, "article": processed_article["article_num"] } if processed_article["has_paragraphs"]: for par_num, par_content in processed_article["paragraphs"]: chunks.append({ "text": f"{article_title} ยง{par_num}. {par_content.strip()}", "metadata": {**chunk_metadata, "paragraph": par_num} }) else: chunks.append({ "text": processed_article["content"], "metadata": chunk_metadata }) logger.info("Split text into %d chunks.", len(chunks)) return chunks def process_file(self, filepath: str) -> None: logger.info("Processing file: %s", filepath) try: with open(filepath, 'r', encoding='utf-8') as file: content = file.read() except Exception as e: logger.error(f"Error reading file {filepath}: {e}") return header, main_content = self.split_header_and_content(content) metadata = self.extract_metadata(main_content) metadata['filename'] = os.path.basename(filepath) chunks = self.split_into_chunks(main_content, metadata) if chunks: try: self.collection.add( documents=[chunk["text"] for chunk in chunks], metadatas=[chunk["metadata"] for chunk in chunks], ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}" for i, chunk in enumerate(chunks)] ) logger.info(f"Added {len(chunks)} chunks from file {metadata['filename']}") except Exception as e: logger.error(f"Error adding chunks to collection: {e}") else: logger.warning(f"No chunks to add from file: {filepath}") def process_all_files(self, directory: str) -> None: logger.info("Starting to process all files in directory: %s", directory) for filename in os.listdir(directory): if filename.endswith('.txt'): filepath = os.path.join(directory, filename) self.process_file(filepath) logger.info("Finished processing files.") def search(self, query: str, n_results: int = 3) -> Dict: logger.info("Searching database for query: %s", query) try: results = self.collection.query( query_texts=[query], n_results=n_results ) logger.info("Found %d results for query: %s", len(results['documents'][0]), query) return results except Exception as e: logger.error(f"Error during search: {e}") return {"documents": [[]], "metadatas": [[]], "distances": [[]]} def list_all_documents(self) -> None: try: all_docs = self.collection.get(include=['metadatas']) if all_docs['metadatas']: for metadata in all_docs['metadatas']: logger.info("Document: %s", metadata) else: logger.info("No documents in the database.") except Exception as e: logger.error(f"Error listing documents: {e}") if __name__ == "__main__": processor = KodeksProcessor() processor.process_all_files("data/kodeksy") processor.list_all_documents()