James Edmunds
Checkpoint! local working probably. new embeddings, added to readme, additional scripts, updated process_lyrics and upload_embeddings and added some testscripts.
d147321 | import json | |
| import sys | |
| import time | |
| from datetime import datetime | |
| from pathlib import Path | |
| import sqlite3 | |
| import shutil | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_openai import OpenAIEmbeddings | |
| from tenacity import ( | |
| retry, | |
| retry_if_exception, | |
| stop_after_attempt, | |
| wait_exponential | |
| ) | |
| from tqdm import tqdm | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from config.settings import Settings # noqa: E402 | |
| from src.utils.data_loader import LyricsLoader # noqa: E402 | |
| class LyricsProcessor: | |
| """Process lyrics files into embeddings for vector search.""" | |
| def __init__( | |
| self, | |
| lyrics_dir: str, | |
| output_dir: str, | |
| batch_size: int = 100 | |
| ): | |
| self.lyrics_dir = Path(lyrics_dir) | |
| self.output_dir = Path(output_dir) | |
| self.batch_size = batch_size | |
| self.embeddings = OpenAIEmbeddings() | |
| self.collection_name = Settings.CHROMA_COLLECTION_NAME | |
| print(f"Using collection name: {self.collection_name}") | |
| # Configure text splitter for lyrics | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=300, | |
| chunk_overlap=75, | |
| separators=["\n\n", "\n", " ", ""], | |
| keep_separator=True | |
| ) | |
| # Initialize loader | |
| self.loader = LyricsLoader(lyrics_dir) | |
| # Ensure output directory exists | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| def validate_text_splitter(self): | |
| """Validate text splitter configuration.""" | |
| required_attrs = ['_chunk_size', '_chunk_overlap'] | |
| missing_attrs = [ | |
| attr for attr in required_attrs | |
| if not hasattr(self.text_splitter, attr) | |
| ] | |
| if missing_attrs: | |
| msg = f"Text splitter missing required attributes: {missing_attrs}" | |
| raise AttributeError(msg) | |
| def _is_rate_limit_error(self, exception): | |
| """Check if exception is a rate limit error.""" | |
| return "rate_limit" in str(exception).lower() | |
| def _create_embeddings_with_backoff(self, vector_store, batch): | |
| """Create embeddings with retry logic.""" | |
| return vector_store.add_documents(batch) | |
| def process_lyrics(self) -> None: | |
| """Main processing pipeline for lyrics.""" | |
| print("Starting lyrics processing pipeline...") | |
| # Validate configuration before starting | |
| print("Validating configuration...") | |
| self.validate_text_splitter() | |
| # Check for existing collection | |
| chroma_dir = Path(self.output_dir) / "chroma" | |
| if chroma_dir.exists(): | |
| sqlite_file = chroma_dir / "chroma.sqlite3" | |
| if sqlite_file.exists(): | |
| try: | |
| conn = sqlite3.connect(sqlite_file) | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT name FROM collections WHERE name = ?", | |
| (self.collection_name,)) | |
| if cursor.fetchone(): | |
| response = input( | |
| f"\nWarning: Collection '{self.collection_name}' already exists.\n" | |
| "Do you want to delete and recreate? (y/N): " | |
| ) | |
| if response.lower() != 'y': | |
| print("Aborting.") | |
| return | |
| print("Removing existing collection...") | |
| shutil.rmtree(chroma_dir) | |
| chroma_dir.mkdir(parents=True) | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error checking existing collection: {e}") | |
| print("Continuing with processing...") | |
| # Load all lyrics documents | |
| print("Loading lyrics files...") | |
| documents = self.loader.load_lyrics() | |
| if not documents: | |
| raise ValueError("No valid lyrics files found") | |
| print(f"Loaded {len(documents)} valid lyrics files") | |
| # Split documents | |
| print("Processing documents...") | |
| processed_docs = [] | |
| for doc in tqdm(documents, desc="Processing documents"): | |
| splits = self.text_splitter.split_documents([doc]) | |
| processed_docs.extend(splits) | |
| n_docs = len(documents) | |
| n_chunks = len(processed_docs) | |
| chunks_msg = f"Created {n_chunks} chunks from {n_docs}" | |
| print(f"{chunks_msg} documents") | |
| # Create embeddings in batches | |
| print("Creating embeddings...") | |
| total_chunks = len(processed_docs) | |
| for i in tqdm(range(0, total_chunks, self.batch_size)): | |
| batch = processed_docs[i:i + self.batch_size] | |
| try: | |
| if i == 0: | |
| # Initialize vector store with first batch | |
| vector_store = Chroma.from_documents( | |
| documents=batch, | |
| embedding=self.embeddings, | |
| persist_directory=str(self.output_dir / "chroma"), | |
| collection_name=self.collection_name | |
| ) | |
| else: | |
| # Add subsequent batches | |
| self._create_embeddings_with_backoff(vector_store, batch) | |
| # Rate limit cooldown | |
| time.sleep(2) | |
| except Exception as e: | |
| if self._is_rate_limit_error(e): | |
| print("Rate limit reached. Waiting before retry...") | |
| time.sleep(60) | |
| continue | |
| raise | |
| # Save processing metadata | |
| metadata = { | |
| 'processed_at': datetime.now().isoformat(), | |
| 'total_documents': len(documents), | |
| 'total_chunks': total_chunks, | |
| 'chunk_size': getattr(self.text_splitter, '_chunk_size', 300), | |
| 'chunk_overlap': getattr(self.text_splitter, '_chunk_overlap', 75) | |
| } | |
| meta_path = self.output_dir / 'processing_metadata.json' | |
| with open(meta_path, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| print("Processing complete!") | |
| print(f"Processed {len(documents)} documents into {total_chunks} chunks") | |
| print(f"Embeddings saved to {self.output_dir / 'chroma'}") | |
| if __name__ == "__main__": | |
| processor = LyricsProcessor( | |
| lyrics_dir=str(Settings.LYRICS_DIR), | |
| output_dir=str(Settings.EMBEDDINGS_DIR), | |
| batch_size=100 | |
| ) | |
| processor.process_lyrics() |