LyrGen2 / scripts /process_lyrics.py
James Edmunds
Checkpoint! local working probably. new embeddings, added to readme, additional scripts, updated process_lyrics and upload_embeddings and added some testscripts.
d147321
import json
import sys
import time
from datetime import datetime
from pathlib import Path
import sqlite3
import shutil
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_exponential
)
from tqdm import tqdm
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from config.settings import Settings # noqa: E402
from src.utils.data_loader import LyricsLoader # noqa: E402
class LyricsProcessor:
"""Process lyrics files into embeddings for vector search."""
def __init__(
self,
lyrics_dir: str,
output_dir: str,
batch_size: int = 100
):
self.lyrics_dir = Path(lyrics_dir)
self.output_dir = Path(output_dir)
self.batch_size = batch_size
self.embeddings = OpenAIEmbeddings()
self.collection_name = Settings.CHROMA_COLLECTION_NAME
print(f"Using collection name: {self.collection_name}")
# Configure text splitter for lyrics
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=75,
separators=["\n\n", "\n", " ", ""],
keep_separator=True
)
# Initialize loader
self.loader = LyricsLoader(lyrics_dir)
# Ensure output directory exists
self.output_dir.mkdir(parents=True, exist_ok=True)
def validate_text_splitter(self):
"""Validate text splitter configuration."""
required_attrs = ['_chunk_size', '_chunk_overlap']
missing_attrs = [
attr for attr in required_attrs
if not hasattr(self.text_splitter, attr)
]
if missing_attrs:
msg = f"Text splitter missing required attributes: {missing_attrs}"
raise AttributeError(msg)
def _is_rate_limit_error(self, exception):
"""Check if exception is a rate limit error."""
return "rate_limit" in str(exception).lower()
@retry(
retry=retry_if_exception(_is_rate_limit_error),
wait=wait_exponential(multiplier=1, min=4, max=10),
stop=stop_after_attempt(3)
)
def _create_embeddings_with_backoff(self, vector_store, batch):
"""Create embeddings with retry logic."""
return vector_store.add_documents(batch)
def process_lyrics(self) -> None:
"""Main processing pipeline for lyrics."""
print("Starting lyrics processing pipeline...")
# Validate configuration before starting
print("Validating configuration...")
self.validate_text_splitter()
# Check for existing collection
chroma_dir = Path(self.output_dir) / "chroma"
if chroma_dir.exists():
sqlite_file = chroma_dir / "chroma.sqlite3"
if sqlite_file.exists():
try:
conn = sqlite3.connect(sqlite_file)
cursor = conn.cursor()
cursor.execute("SELECT name FROM collections WHERE name = ?",
(self.collection_name,))
if cursor.fetchone():
response = input(
f"\nWarning: Collection '{self.collection_name}' already exists.\n"
"Do you want to delete and recreate? (y/N): "
)
if response.lower() != 'y':
print("Aborting.")
return
print("Removing existing collection...")
shutil.rmtree(chroma_dir)
chroma_dir.mkdir(parents=True)
conn.close()
except Exception as e:
print(f"Error checking existing collection: {e}")
print("Continuing with processing...")
# Load all lyrics documents
print("Loading lyrics files...")
documents = self.loader.load_lyrics()
if not documents:
raise ValueError("No valid lyrics files found")
print(f"Loaded {len(documents)} valid lyrics files")
# Split documents
print("Processing documents...")
processed_docs = []
for doc in tqdm(documents, desc="Processing documents"):
splits = self.text_splitter.split_documents([doc])
processed_docs.extend(splits)
n_docs = len(documents)
n_chunks = len(processed_docs)
chunks_msg = f"Created {n_chunks} chunks from {n_docs}"
print(f"{chunks_msg} documents")
# Create embeddings in batches
print("Creating embeddings...")
total_chunks = len(processed_docs)
for i in tqdm(range(0, total_chunks, self.batch_size)):
batch = processed_docs[i:i + self.batch_size]
try:
if i == 0:
# Initialize vector store with first batch
vector_store = Chroma.from_documents(
documents=batch,
embedding=self.embeddings,
persist_directory=str(self.output_dir / "chroma"),
collection_name=self.collection_name
)
else:
# Add subsequent batches
self._create_embeddings_with_backoff(vector_store, batch)
# Rate limit cooldown
time.sleep(2)
except Exception as e:
if self._is_rate_limit_error(e):
print("Rate limit reached. Waiting before retry...")
time.sleep(60)
continue
raise
# Save processing metadata
metadata = {
'processed_at': datetime.now().isoformat(),
'total_documents': len(documents),
'total_chunks': total_chunks,
'chunk_size': getattr(self.text_splitter, '_chunk_size', 300),
'chunk_overlap': getattr(self.text_splitter, '_chunk_overlap', 75)
}
meta_path = self.output_dir / 'processing_metadata.json'
with open(meta_path, 'w') as f:
json.dump(metadata, f, indent=2)
print("Processing complete!")
print(f"Processed {len(documents)} documents into {total_chunks} chunks")
print(f"Embeddings saved to {self.output_dir / 'chroma'}")
if __name__ == "__main__":
processor = LyricsProcessor(
lyrics_dir=str(Settings.LYRICS_DIR),
output_dir=str(Settings.EMBEDDINGS_DIR),
batch_size=100
)
processor.process_lyrics()