from openai import AsyncOpenAI import logging from typing import List, Dict, Union, Optional from datasets import Dataset import asyncio import numpy as np from sklearn.metrics.pairwise import cosine_similarity from src.api.exceptions import OpenAIError # Set up structured logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) class EmbeddingService: def __init__( self, openai_api_key: str, model: str = "text-embedding-3-small", batch_size: int = 10, max_concurrent_requests: int = 10, # Limit to 10 concurrent requests ): self.client = AsyncOpenAI(api_key=openai_api_key) self.model = model self.batch_size = batch_size self.semaphore = asyncio.Semaphore(max_concurrent_requests) # Rate limiter self.total_requests = 0 # Total number of requests to process self.completed_requests = 0 # Number of completed requests async def get_embedding(self, text: str) -> List[float]: """Generate embeddings for the given text using OpenAI.""" text = text.replace("\n", " ") try: async with self.semaphore: # Acquire a semaphore slot response = await self.client.embeddings.create( input=[text], model=self.model ) self.completed_requests += 1 # Increment completed requests self._log_progress() # Log progress return response.data[0].embedding except Exception as e: logger.error(f"Failed to generate embedding: {e}") raise OpenAIError(f"OpenAI API error: {e}") async def create_embeddings( self, data: Union[Dataset, List[str]], target_column: str = None, output_column: str = "embeddings", ) -> Union[Dataset, List[List[float]]]: """ Create embeddings for either a Dataset or a list of strings. Args: data: Either a Dataset or a list of strings. target_column: The column in the Dataset to generate embeddings for (required if data is a Dataset). output_column: The column to store embeddings in the Dataset (default: "embeddings"). Returns: If data is a Dataset, returns the Dataset with the embeddings column. If data is a list of strings, returns a list of embeddings. """ if isinstance(data, Dataset): if not target_column: raise ValueError("target_column is required when data is a Dataset.") return await self._create_embeddings_for_dataset( data, target_column, output_column ) elif isinstance(data, list): return await self._create_embeddings_for_texts(data) else: raise TypeError( "data must be either a Hugging Face Dataset or a list of strings." ) async def _create_embeddings_for_dataset( self, dataset: Dataset, target_column: str, output_column: str ) -> Dataset: """Create embeddings for the target column in the Dataset.""" logger.info("Generating embeddings for Dataset...") self.total_requests = len(dataset) # Set total number of requests self.completed_requests = 0 # Reset completed requests counter embeddings = [] for i in range(0, len(dataset), self.batch_size): batch = dataset[i : i + self.batch_size] batch_embeddings = await asyncio.gather( *[self.get_embedding(text) for text in batch[target_column]] ) embeddings.extend(batch_embeddings) dataset = dataset.add_column(output_column, embeddings) return dataset async def _create_embeddings_for_texts(self, texts: List[str]) -> List[List[float]]: """Create embeddings for a list of strings.""" logger.info("Generating embeddings for list of texts...") self.total_requests = len(texts) # Set total number of requests self.completed_requests = 0 # Reset completed requests counter batches = [ texts[i : i + self.batch_size] for i in range(0, len(texts), self.batch_size) ] embeddings = [] for batch in batches: batch_embeddings = await asyncio.gather( *[self.get_embedding(text) for text in batch] ) embeddings.extend(batch_embeddings) return embeddings def _log_progress(self): """Log the progress of embedding generation.""" progress = (self.completed_requests / self.total_requests) * 100 logger.info( f"Progress: {self.completed_requests}/{self.total_requests} ({progress:.2f}%)" ) # async def search_embeddings( # self, # query_embeddings: List[List[float]], # dataset: Dataset, # embedding_column: str, # target_column: str, # num_results: int, # ) -> Dict[str, List]: # """ # Perform a cosine similarity search between query embeddings and dataset embeddings. # Args: # query_embeddings: List of embeddings for the query texts. # dataset: The dataset to search in. # embedding_column: The column in the dataset containing embeddings. # target_column: The column to return in the results. # num_results: The number of results to return. # Returns: # A dictionary of lists containing the target column values and their similarity scores. # """ # dataset_embeddings = np.array(dataset[embedding_column]) # query_embeddings = np.array(query_embeddings) # # Compute cosine similarity # similarities = cosine_similarity(query_embeddings, dataset_embeddings) # # Initialize the results dictionary # results = { # target_column: [], # "similarity": [], # } # # Get the top-k results for each query # for query_similarities in similarities: # top_k_indices = np.argsort(query_similarities)[-num_results:][::-1] # for idx in top_k_indices: # results[target_column].append(dataset[target_column][idx]) # results["similarity"].append(float(query_similarities[idx])) # return results async def search_embeddings( self, query_embeddings: List[List[float]], dataset: Dataset, embedding_column: str, target_column: str, num_results: int, additional_columns: Optional[List[str]] = None, ) -> Dict[str, List]: """ Perform a cosine similarity search between query embeddings and dataset embeddings. Args: query_embeddings: List of embeddings for the query texts. dataset: The dataset to search in. embedding_column: The column in the dataset containing embeddings. target_column: The column to return in the results. num_results: The number of results to return. additional_columns: List of additional columns to include in the results. Returns: A dictionary of lists containing the target column values, their similarity scores, and any additional columns specified. """ dataset_embeddings = np.array(dataset[embedding_column]) query_embeddings = np.array(query_embeddings) # Compute cosine similarity similarities = cosine_similarity(query_embeddings, dataset_embeddings) # Initialize the results dictionary results = { target_column: [], "similarity": [], } # Add additional columns to the results dictionary if additional_columns: for column in additional_columns: results[column] = [] # Get the top-k results for each query for query_similarities in similarities: top_k_indices = np.argsort(query_similarities)[-num_results:][::-1] for idx in top_k_indices: results[target_column].append(dataset[target_column][idx]) results["similarity"].append(float(query_similarities[idx])) if additional_columns: for column in additional_columns: results[column].append(dataset[column][idx]) return results