import gradio as gr import kagglehub import faiss import numpy as np import pandas as pd import spaces # Download latest version path = kagglehub.dataset_download("carlosgdcj/genius-song-lyrics-with-language-information") print("Path to dataset files:", path) from sentence_transformers import SentenceTransformer # Fast embedding model # Load fast embedding model embedding_model = SentenceTransformer('all-mpnet-base-v2') # Define function to embed text def embed_text(text): return embedding_model.encode(text) # Initialize FAISS index embedding_dim = 768 # Adjust this based on the fast embedding model used index = faiss.IndexFlatL2(embedding_dim) # L2 distance from tqdm import tqdm metadata = {} # Process data in chunks chunk_size = 3000 # Adjust based on memory limits csv_path = path + "/song_lyrics.csv" # Initialize FAISS index with the embedding dimension embedding_dim = 768 # Replace with the actual embedding dimension if different index = faiss.IndexFlatL2(embedding_dim) # Set up total number of chunks to display a full progress bar total_chunks = 1 # Adjust or remove if you're reading the entire file for chunk_idx, chunk in enumerate(tqdm(pd.read_csv(csv_path, chunksize=chunk_size), total=total_chunks, desc="Processing Chunks")): # Break the loop if you've reached the desired number of chunks (for testing) if chunk_idx >= total_chunks: break embeddings = [] for idx, row in tqdm(chunk.iterrows(), total=len(chunk), desc=f"Embedding Chunk {chunk_idx + 1}"): identifier = f"{row['artist']} {row['title']}" # Generate embedding for the identifier embedding = embed_text(identifier) # Add to batch of embeddings embeddings.append(embedding) # Store metadata with the current FAISS index position global_idx = chunk_idx * chunk_size + idx metadata[global_idx] = {"identifier": identifier, "lyrics": row["lyrics"]} # Convert batch embeddings to numpy array and add to FAISS index embeddings_np = np.array(embeddings).astype("float32") index.add(embeddings_np) print("Indexing complete.") from itertools import combinations import re def chunk_query(query, max_chunk_length=5): """Break query into overlapping chunks of different lengths, ignoring non-word characters.""" # Remove non-word characters (keeping letters, digits, and spaces) cleaned_query = re.sub(r'\W+', ' ', query) # Split cleaned query into words words = cleaned_query.split() chunks = set() # Generate chunks with lengths from 1 up to max_chunk_length for length in range(1, min(max_chunk_length, len(words)) + 1): for i in range(len(words) - length + 1): chunk = " ".join(words[i:i + length]) chunks.add(chunk) print(list(chunks)) return list(chunks) def retrieve_lyrics(query, top_k=3, similarity_threshold=0.3): # Break the query into chunks query_chunks = chunk_query(query) # Initialize lists to store all distances and indices all_distances = [] all_indices = [] # Iterate over each chunk, embed, and search in FAISS for chunk in query_chunks: chunk_embedding = embed_text(chunk).astype("float32") # Perform similarity search for each chunk distances, indices = index.search(np.array([chunk_embedding]), top_k) # Accumulate distances and indices all_distances.extend(distances[0]) all_indices.extend(indices[0]) # Find the closest match across all chunks if all_distances: # Check if distances were found min_distance = min(all_distances) min_index = all_indices[all_distances.index(min_distance)] # Check if the closest match is within the similarity threshold if min_distance < similarity_threshold: if min_index in metadata: print(f"Closest match distance: {min_distance}") result = metadata[min_index] return result["lyrics"] # If no suitable match found, return None return None def prepare_qwen_input(user_query): """Prepare input for Qwen model based on lyrics retrieval and user query.""" lyrics = retrieve_lyrics(user_query) if lyrics: # Combine user's query with the retrieved lyrics context = f"Lyrics:\n'{lyrics}'. {user_query}." else: # No lyrics found, so use the query alone context = user_query return context from transformers import pipeline pipe = pipeline("text-generation", model="Qwen/Qwen2.5-1.5B-Instruct", max_length=1250, device=0) @spaces.GPU def respond( message, history: list[tuple[str, str]], ): # Build messages in the format expected by chat models messages = [{"role": "user", "content": message}] for user_msg, assistant_msg in history: if user_msg: messages.append({"role": "user", "content": prepare_qwen_input(user_msg)}) if assistant_msg: messages.append({"role": "assistant", "content": assistant_msg}) # Generate response in chat format response = "" for output in pipe(messages, max_length=1250, return_full_text=False, do_sample=True): response += output['generated_text'] yield response """ For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/chatinterface """ demo = gr.ChatInterface( respond, additional_inputs=[], ) if __name__ == "__main__": demo.launch()