| | import os |
| | from pathlib import Path |
| | from typing import List, Optional |
| | from langchain_core.documents import Document |
| | import chardet |
| | from tqdm import tqdm |
| |
|
| | class LyricsLoader: |
| | def __init__(self, lyrics_dir: str = "lyrics"): |
| | self.lyrics_dir = Path(lyrics_dir) |
| |
|
| | def detect_file_encoding(self, file_path: Path) -> str: |
| | """Detect the encoding of a file""" |
| | with open(file_path, 'rb') as file: |
| | raw_data = file.read() |
| | result = chardet.detect(raw_data) |
| | return result['encoding'] |
| |
|
| | def clean_lyrics_text(self, text: str) -> str: |
| | """Clean up lyrics text and normalize formatting""" |
| | |
| | if '\n' not in text and len(text) > 100: |
| | |
| | for punct in ['. ', '? ', '! ']: |
| | text = text.replace(punct, punct + '\n') |
| | |
| | for word in [' cause ', ' cos ', ' when ', ' and ']: |
| | text = text.replace(word, '\n' + word.strip()) |
| | |
| | if len(text) > 200: |
| | words = text.split() |
| | lines = [] |
| | current_line = [] |
| | for word in words: |
| | current_line.append(word) |
| | if len(' '.join(current_line)) > 50: |
| | lines.append(' '.join(current_line)) |
| | current_line = [] |
| | if current_line: |
| | lines.append(' '.join(current_line)) |
| | text = '\n'.join(lines) |
| | |
| | |
| | lines = text.split('\n') |
| | cleaned_lines = [] |
| | prev_line = "" |
| | consecutive_empty = 0 |
| | |
| | |
| | header_patterns = [ |
| | 'contributors', |
| | 'translations', |
| | 'lyrics', |
| | 'tradução', |
| | 'traducción', |
| | 'written by', |
| | 'produced by', |
| | 'you might also like', |
| | 'embed' |
| | ] |
| | |
| | for line in lines: |
| | |
| | line = line.strip() |
| | |
| | |
| | if not line: |
| | consecutive_empty += 1 |
| | if consecutive_empty <= 2: |
| | cleaned_lines.append("") |
| | continue |
| | |
| | consecutive_empty = 0 |
| | |
| | |
| | lower_line = line.lower() |
| | cleaned_line = line |
| | for pattern in header_patterns: |
| | |
| | pattern_start = lower_line.find(pattern) |
| | if pattern_start != -1: |
| | pattern_end = pattern_start + len(pattern) |
| | |
| | while pattern_end < len(line) and line[pattern_end] in [':', '-', ' ']: |
| | pattern_end += 1 |
| | cleaned_line = line[:pattern_start].strip() + ' ' + line[pattern_end:].strip() |
| | cleaned_line = cleaned_line.strip() |
| | |
| | |
| | if not cleaned_line: |
| | continue |
| | |
| | |
| | if cleaned_line == prev_line: |
| | continue |
| | |
| | |
| | if any(pattern in lower_line for pattern in [ |
| | 'verse', 'chorus', 'bridge', 'hook', |
| | 'intro', 'outro', 'pre-chorus' |
| | ]): |
| | cleaned_lines.append(f"[{cleaned_line.strip('[]')}]") |
| | continue |
| | |
| | cleaned_lines.append(cleaned_line) |
| | prev_line = cleaned_line |
| | |
| | |
| | while cleaned_lines and not cleaned_lines[-1]: |
| | cleaned_lines.pop() |
| | |
| | |
| | cleaned_text = '\n'.join(cleaned_lines) |
| | return cleaned_text.strip() |
| |
|
| | def is_valid_lyric_file(self, file_path: Path) -> bool: |
| | """Check if file is a valid lyrics file""" |
| | |
| | invalid_patterns = [ |
| | '[artwork]', 'artwork', 'cover', '.jpg', '.png', |
| | 'tracklist', 'credits', 'booklet', 'album art' |
| | ] |
| | |
| | |
| | lower_name = file_path.name.lower() |
| | if any(pattern in lower_name for pattern in invalid_patterns): |
| | return False |
| | |
| | |
| | if not lower_name.endswith('.txt'): |
| | return False |
| | |
| | |
| | file_size = file_path.stat().st_size |
| | if file_size < 100 or file_size > 1000000: |
| | return False |
| | |
| | return True |
| |
|
| | def read_and_validate_lyrics( |
| | self, |
| | file_path: Path, |
| | artist_name: str |
| | ) -> Optional[Document]: |
| | """Read and validate a lyrics file with encoding detection""" |
| | try: |
| | |
| | for encoding in ['utf-8', 'latin-1', 'cp1252']: |
| | try: |
| | with open(file_path, 'r', encoding=encoding) as f: |
| | text = f.read().strip() |
| | |
| | |
| | if not text or len(text) < 10: |
| | print(f"Warning: Invalid or empty lyrics in {file_path.name}") |
| | return None |
| | |
| | |
| | cleaned_text = self.clean_lyrics_text(text) |
| | if not cleaned_text: |
| | print(f"Warning: No valid content after cleaning in {file_path.name}") |
| | return None |
| | |
| | |
| | metadata = { |
| | 'artist': artist_name, |
| | 'song_title': file_path.stem, |
| | 'source': str(file_path), |
| | 'encoding': encoding, |
| | 'original_size': len(text), |
| | 'cleaned_size': len(cleaned_text) |
| | } |
| | |
| | return Document( |
| | page_content=cleaned_text, |
| | metadata=metadata |
| | ) |
| | except UnicodeDecodeError: |
| | continue |
| | |
| | print(f"Error: Could not decode {file_path.name} with supported encodings") |
| | return None |
| | |
| | except Exception as e: |
| | print(f"Error reading {file_path.name}: {str(e)}") |
| | return None |
| |
|
| | def load_lyrics(self) -> List[Document]: |
| | """Load and process lyrics from directory structure organized by artist""" |
| | documents = [] |
| | |
| | if not self.lyrics_dir.exists(): |
| | raise FileNotFoundError( |
| | f"Lyrics directory not found: {self.lyrics_dir}" |
| | ) |
| | |
| | |
| | total_files = sum( |
| | 1 for artist_dir in self.lyrics_dir.iterdir() |
| | if artist_dir.is_dir() |
| | for f in artist_dir.glob('*.txt') |
| | if self.is_valid_lyric_file(f) |
| | ) |
| | |
| | if total_files == 0: |
| | raise ValueError("No valid lyrics files found") |
| | |
| | |
| | with tqdm(total=total_files, desc="Loading lyrics") as pbar: |
| | for artist_dir in self.lyrics_dir.iterdir(): |
| | if artist_dir.is_dir(): |
| | artist_name = artist_dir.name |
| | lyric_files = [ |
| | f for f in artist_dir.glob('*.txt') |
| | if self.is_valid_lyric_file(f) |
| | ] |
| | |
| | for lyric_file in lyric_files: |
| | doc = self.read_and_validate_lyrics( |
| | lyric_file, |
| | artist_name |
| | ) |
| | if doc: |
| | documents.append(doc) |
| | pbar.update(1) |
| | |
| | print(f"Successfully loaded {len(documents)} valid lyrics files") |
| | return documents |