# Import necessary libraries import requests import numpy as np import nest_asyncio import fasttext import torch nest_asyncio.apply() from typing import List from rank_bm25 import BM25L from normalizer import Normalizer from fastapi import HTTPException from optimum.onnxruntime import ORTModelForFeatureExtraction from sentenceTranformer import SentenceEmbeddingPipeline from transformers import AutoTokenizer # Initialize # model_path = "Abdul-Ib/all-MiniLM-L6-v2-2024" # semantic_model = SentenceTransformer(model_path, cache_folder="./assets") try: # Load the semantic model tokenizer = AutoTokenizer.from_pretrained("./assets/onnx") model = ORTModelForFeatureExtraction.from_pretrained( "./assets/onnx", file_name="model_quantized.onnx" ) semantic_model = SentenceEmbeddingPipeline(model=model, tokenizer=tokenizer) except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during semantic model loading: {e}", ) # Initialization try: normalizer = Normalizer() categorizer = fasttext.load_model("./assets/categorization_pipeline.ftz") category_map = np.load("./assets/category_map.npy", allow_pickle=True).item() except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during initialization of categorizer and normalizer: {e}", ) def make_request(url: str) -> dict: """ Make a GET request to the given URL and return the JSON response. Args: - url (str): The URL to make the request to. Returns: - dict: The JSON response. Raises: - HTTPException: If the request fails with a non-200 status code. """ try: response = requests.get(url) if response.status_code == 200: return response.json() else: raise HTTPException( status_code=response.status_code, detail=f"Request failed with status code: {response.status_code}", ) except Exception as e: raise HTTPException( status_code=404, detail=f"An error occurred during the request: {e}", ) async def full_text_search(query: str, keyword_search: BM25L) -> np.ndarray: """ Perform full-text search using the given query and BM25L model. Args: - query (str): The query to search for. - keyword_search (BM25L): The BM25L model for keyword search. Returns: - np.ndarray: The scores of the search results. """ try: translated_query = await normalizer.translate_text(query) tokenized_query = translated_query.split(" ") ft_scores = keyword_search.get_scores(tokenized_query) return ft_scores except Exception as e: # Handle exceptions such as AttributeError and ValueError raise HTTPException( status_code=500, detail=f"An error occurred during full-text search: {e}", ) async def semantic_search(query: str, doc_embeddings: torch.Tensor) -> torch.Tensor: """ Perform semantic search using the given query and document embeddings. Args: - query (str): The query to search for. - doc_embeddings (np.ndarray): The document embeddings for semantic search. Returns: - np.ndarray: The cosine similarity scores of the search results. """ try: translated_query = await normalizer.translate_text(query) query_embedding = semantic_model(translated_query)[0] cos_sim = torch.nn.functional.cosine_similarity( query_embedding, doc_embeddings, dim=-1 ) return cos_sim except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during semantic search: {e}", ) def hybrid_search( keyword_scores: np.ndarray, semantic_scores: torch.Tensor, alpha: float = 0.7 ) -> np.ndarray: """ Perform hybrid search combining keyword and semantic scores. Args: - keyword_scores (np.ndarray): The keyword search scores. - semantic_scores (np.ndarray): The semantic search scores. - alpha (float): The weight for the keyword scores. Returns: - np.ndarray: The hybrid scores. """ try: keyword_scores = 2 / np.pi * np.arctan(keyword_scores) - 0.5 keyword_scores[keyword_scores < 0] = 0 hybrid_scores = alpha * keyword_scores + (1 - alpha) * semantic_scores.numpy() return hybrid_scores except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during hybrid search: {e}", ) def rerank_results(request_json: List[dict], hybrid_scores: np.ndarray) -> List[dict]: """ Rerank search results based on hybrid scores. Args: - request_json (List[dict]): The list of search results. - hybrid_scores (np.ndarray): The hybrid scores. Returns: - List[dict]: The reranked search results. """ try: for index, product in enumerate(request_json): product["score"] = hybrid_scores[index] return sorted(request_json, key=lambda k: k["score"], reverse=True) except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during reranking: {e}", ) def calculate_interrelations( request_json: List[dict], doc_embeddings: np.ndarray, interrelation_threshold: float = 0.9, ) -> None: """ Calculate interrelations between products based on cosine similarity of their embeddings. Args: - request_json (List[dict]): The list of products. - doc_embeddings (np.ndarray): The document embeddings for products. - interrelation_threshold (float): How similar two products are. Raises: - HTTPException: If an error occurs during interrelation calculation. Returns: - None """ try: num_products = len(request_json) doc_embeddings_norm = torch.nn.functional.normalize(doc_embeddings, p=2, dim=1) cos_sim_matrix = torch.mm( doc_embeddings_norm, doc_embeddings_norm.transpose(0, 1) ) # cos_sim_matrix = torch.nn.functional.cosine_similarity( # doc_embeddings, doc_embeddings, dim=1 # ) # logger.info(f"sentransformers.utils. {util.cos_sim(doc_embeddings, doc_embeddings)}") # logger.warning(f"cos_sim_matrix: {cos_sim_matrix}") for i in range(num_products): related_indices = np.where(cos_sim_matrix[i] > interrelation_threshold)[0] related_products = [ request_json[idx]["key"] for idx in related_indices if idx != i ] request_json[i]["interrelations"] = related_products except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during interrelation calculation: {e}", ) async def check_validity(query: str, keyword_search: BM25L) -> np.ndarray: """ Check the validity of the input query against keyword match search. This function attempts to find valid search results for the input query by following these steps: 1. Perform a keyword match search on the original query. 2. If any matches are found in step 1, return the search scores. 3. Generate a modified query by keeping only one character from the original query and perform a keyword match search. 4. If any matches are found in step 3, return the search scores. 5. Check the spelling of the original query. If the spelling correction is successful, perform a keyword match search with the corrected query. 6. If any matches are found in step 5, return the search scores. 7. If none of the attempts yield non-zero scores, return the scores of the original query. Args: - query (str): The input query to check its validity. - keyword_search (BM25L): The BM25L model for keyword search. Returns: - np.ndarray: The scores of the search results. """ try: # Step 1: Perform keyword match search on the original query keyword_scores = await full_text_search(query, keyword_search) # Step 2: If any matches found in step 1, return the search scores if max(keyword_scores) != 0.0: return keyword_scores # Step 3: Generate a modified query by keeping only one character and perform a keyword match search one_char_query = normalizer.keep_one_char(query) one_char_scores = await full_text_search(one_char_query, keyword_search) # Step 4: If any matches found in step 3, return the search scores if max(one_char_scores) != 0.0: return one_char_scores # Step 5: Check spelling of the original query and perform a keyword match search with the corrected query spelled_query = normalizer.check_spelling(query) # Step 6: If any matches found in step 5, return the search scores if spelled_query is not None: spelled_scores = await full_text_search(spelled_query, keyword_search) if max(spelled_scores) != 0.0: return spelled_scores # Step 7: If none of the attempts yield non-zero scores, return the scores of the original query return keyword_scores except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during query validity check: {e}", ) async def is_cheapest(query: str, request_json: list) -> list: """ Check which product is the cheapest within the same category as each input query. Args: query (str): The input query request_json (list): List of products """ try: query_categories = [ category_map[category][0] for category in categorizer.predict(query, k=3, threshold=0.5)[0] ] # print(f"Query {query} categories: {query_categories}") min_idx = 0 min_price = float("inf") # Initialize min_price as positive infinity for idx, product in enumerate(request_json): if ( product["Inferred Category"] in query_categories and product["price"] <= min_price ): min_idx = idx min_price = product[ "price" ] # Update min_price if a cheaper product is found for product in request_json: product["cheapest"] = False # Reset "cheapest" field for all products # print(f"Cheapest product: {request_json[min_idx]['name']}, Price: {request_json[min_idx]['price']}") request_json[min_idx][ "cheapest" ] = True # Mark the cheapest product for the current query # print(request_json) return request_json except Exception as e: raise HTTPException( status_code=500, detail=f"An error occurred during cheapest product identification: {e}", ) def check_keys(request_json: List[dict], required_keys: list): """ Check if each dictionary in a list contains all the required keys. Parameters: request_json (list): A list of dictionaries to be checked. required_keys (list): A list of keys that each dictionary must contain. Returns: bool: True if all dictionaries in the list contain all required keys, False otherwise. """ for item in request_json: if not all(key in item for key in required_keys): raise HTTPException( status_code=400, detail=f"Missing keys in dictionary: {item}" )