Spaces:
Sleeping
Sleeping
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from .utils import ( | |
| bonus_lexical, | |
| inferir_categoria_consulta, | |
| limpar_texto, | |
| mapear_categoria, | |
| ) | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| DATA_DIR = os.path.join(BASE_DIR, "data") | |
| LOGS_DIR = os.path.join(BASE_DIR, "logs") | |
| PATH_PRODUCTS = os.path.join(DATA_DIR, "produtos_finetunado.csv") | |
| PATH_EMBEDDINGS = os.path.join(DATA_DIR, "embeddings_produtos_finetunado.npy") | |
| PATH_NEGATIVE_MEMORY = os.path.join(LOGS_DIR, "negative_memory.csv") | |
| MODEL_NAME = os.getenv("HF_MODEL_REPO", "Ana2012/bertimbau-buscador").strip() | |
| HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip() | |
| class SearchEngine: | |
| def __init__(self): | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model = None | |
| self.df_produtos = None | |
| self.emb_produtos = None | |
| self.df_negative_memory = pd.DataFrame() | |
| self.negative_memory_mtime = None | |
| def load(self): | |
| self._load_products() | |
| self._load_model() | |
| self._load_embeddings() | |
| self._refresh_negative_memory(force=True) | |
| def _load_products(self): | |
| df = pd.read_csv(PATH_PRODUCTS) | |
| df.columns = df.columns.str.strip().str.lower() | |
| df["product_name"] = df["product_name"].fillna("").astype(str) | |
| df["description"] = df["description"].fillna("").astype(str) | |
| df["categoria_principal"] = df["categoria_principal"].fillna("").astype(str) | |
| df["category_names_text"] = df["category_names_text"].fillna("").astype(str) | |
| df["region"] = df["region"].fillna("").astype(str) | |
| df["neighborhood"] = df["neighborhood"].fillna("").astype(str) | |
| df["product_name_limpo"] = df["product_name"].apply(limpar_texto) | |
| df["description_limpa"] = df["description"].apply(limpar_texto) | |
| df["categoria_principal_limpa"] = df["categoria_principal"].apply(limpar_texto) | |
| df["category_names_text_limpo"] = df["category_names_text"].apply(limpar_texto) | |
| df["region_limpa"] = df["region"].apply(limpar_texto) | |
| df["neighborhood_limpo"] = df["neighborhood"].apply(limpar_texto) | |
| df["texto_busca_reforcado"] = ( | |
| "produto " + df["product_name_limpo"] + " " | |
| + "categoria " + df["categoria_principal_limpa"] + " " | |
| + "categorias " + df["category_names_text_limpo"] + " " | |
| + "bairro " + df["neighborhood_limpo"] + " " | |
| + "regiao " + df["region_limpa"] + " " | |
| + "descricao " + df["description_limpa"] | |
| ).str.strip() | |
| df["categoria_grupo"] = df["categoria_principal"].apply(mapear_categoria) | |
| self.df_produtos = df | |
| def _load_model(self): | |
| kwargs = {"device": self.device} | |
| if HF_API_TOKEN: | |
| kwargs["token"] = HF_API_TOKEN | |
| # Usa o mesmo pipeline validado localmente com SentenceTransformer. | |
| self.model = SentenceTransformer(MODEL_NAME, **kwargs) | |
| def _load_embeddings(self): | |
| self.emb_produtos = np.load(PATH_EMBEDDINGS) | |
| # Se estes embeddings .npy foram gerados com outro pipeline | |
| # (por exemplo, AutoModel + mean pooling manual), os scores podem ficar inconsistentes. | |
| # Nesse caso, regenere os embeddings dos produtos com o mesmo SentenceTransformer. | |
| if self.emb_produtos.ndim != 2: | |
| raise RuntimeError("O arquivo de embeddings precisa conter uma matriz 2D.") | |
| def runtime_info(self): | |
| return { | |
| "model_repo": MODEL_NAME, | |
| "device": self.device, | |
| "products_loaded": 0 if self.df_produtos is None else int(len(self.df_produtos)), | |
| "embeddings_loaded": 0 if self.emb_produtos is None else int(len(self.emb_produtos)), | |
| "embedding_dim": 0 if self.emb_produtos is None else int(self.emb_produtos.shape[1]), | |
| } | |
| def _refresh_negative_memory(self, force=False): | |
| if not os.path.exists(PATH_NEGATIVE_MEMORY): | |
| self.df_negative_memory = pd.DataFrame() | |
| self.negative_memory_mtime = None | |
| return | |
| current_mtime = os.path.getmtime(PATH_NEGATIVE_MEMORY) | |
| if not force and self.negative_memory_mtime == current_mtime: | |
| return | |
| df = pd.read_csv(PATH_NEGATIVE_MEMORY) | |
| df.columns = df.columns.str.strip().str.lower() | |
| for col in ["query", "product_id", "product_name", "motivo", "rating"]: | |
| if col not in df.columns: | |
| df[col] = "" | |
| df["query"] = df["query"].fillna("").astype(str) | |
| df["query_limpa"] = df["query"].apply(limpar_texto) | |
| df["product_id"] = df["product_id"].fillna("").astype(str) | |
| df["product_name"] = df["product_name"].fillna("").astype(str) | |
| df["motivo"] = df["motivo"].fillna("").astype(str) | |
| df["rating_num"] = pd.to_numeric(df["rating"], errors="coerce") | |
| self.df_negative_memory = df | |
| self.negative_memory_mtime = current_mtime | |
| def _similaridade_consulta(self, query_atual, query_memoria): | |
| if not query_atual or not query_memoria: | |
| return 0.0 | |
| if query_atual == query_memoria: | |
| return 1.0 | |
| termos_atuais = set(query_atual.split()) | |
| termos_memoria = set(query_memoria.split()) | |
| if not termos_atuais or not termos_memoria: | |
| return 0.0 | |
| intersecao = len(termos_atuais & termos_memoria) | |
| if intersecao == 0: | |
| return 0.0 | |
| return intersecao / max(len(termos_atuais), len(termos_memoria)) | |
| def _calcular_penalidade_feedback(self, query_text, df_filtrado): | |
| self._refresh_negative_memory() | |
| if self.df_negative_memory.empty: | |
| return np.zeros(len(df_filtrado)) | |
| query_limpa = limpar_texto(query_text) | |
| memorias = self.df_negative_memory[ | |
| self.df_negative_memory["product_id"].isin(df_filtrado["product_id"].astype(str)) | |
| ] | |
| if memorias.empty: | |
| return np.zeros(len(df_filtrado)) | |
| penalidades = {} | |
| for _, memoria in memorias.iterrows(): | |
| similaridade = self._similaridade_consulta(query_limpa, memoria["query_limpa"]) | |
| if similaridade <= 0: | |
| continue | |
| penalidade = 0.08 + (0.12 * similaridade) | |
| if memoria["motivo"] == "nao_foi_util": | |
| penalidade += 0.04 | |
| if pd.notna(memoria["rating_num"]) and memoria["rating_num"] <= 2: | |
| penalidade += 0.04 | |
| product_id = memoria["product_id"] | |
| penalidades[product_id] = min(penalidades.get(product_id, 0.0) + penalidade, 0.45) | |
| return df_filtrado["product_id"].astype(str).map(lambda x: penalidades.get(x, 0.0)).values | |
| def gerar_embedding_unico(self, texto): | |
| embedding = self.model.encode( | |
| texto, | |
| convert_to_numpy=True, | |
| normalize_embeddings=False, | |
| show_progress_bar=False, | |
| ) | |
| return np.asarray(embedding, dtype=np.float32) | |
| def buscar(self, query_text, top_k=5): | |
| query_limpa = limpar_texto(query_text) | |
| categoria = inferir_categoria_consulta(query_limpa) | |
| if categoria is not None: | |
| mask = self.df_produtos["categoria_grupo"] == categoria | |
| df_filtrado = self.df_produtos[mask].copy() | |
| idx_filtrado = df_filtrado.index.tolist() | |
| else: | |
| df_filtrado = self.df_produtos.copy() | |
| idx_filtrado = df_filtrado.index.tolist() | |
| if len(df_filtrado) == 0: | |
| df_filtrado = self.df_produtos.copy() | |
| idx_filtrado = df_filtrado.index.tolist() | |
| emb_query = self.gerar_embedding_unico(query_text).reshape(1, -1) | |
| emb_base = self.emb_produtos[idx_filtrado] | |
| if emb_base.shape[1] != emb_query.shape[1]: | |
| raise RuntimeError( | |
| "Dimensao incompatível entre os embeddings salvos e o embedding da consulta. " | |
| "Regenere o arquivo .npy com o mesmo modelo SentenceTransformer." | |
| ) | |
| sims = cosine_similarity(emb_query, emb_base)[0] | |
| bonus = df_filtrado.apply( | |
| lambda row: bonus_lexical( | |
| query_text, | |
| row["product_name"], | |
| row["categoria_principal"], | |
| row["neighborhood"], | |
| row["region"], | |
| row["description"], | |
| row["texto_busca_reforcado"], | |
| ), | |
| axis=1, | |
| ).values | |
| penalidade_feedback = self._calcular_penalidade_feedback(query_text, df_filtrado) | |
| score_final = sims + bonus - penalidade_feedback | |
| top_idx_local = np.argsort(score_final)[::-1][:top_k] | |
| resultados = [] | |
| for rank, idx_local in enumerate(top_idx_local, start=1): | |
| idx_global = idx_filtrado[idx_local] | |
| prod = self.df_produtos.iloc[idx_global] | |
| resultados.append({ | |
| "rank": rank, | |
| "establishment_id": str(prod["establishment_id"]), | |
| "product_id": str(prod["product_id"]), | |
| "product_name": prod["product_name"], | |
| "categoria_principal": prod["categoria_principal"], | |
| "categoria_grupo": prod["categoria_grupo"], | |
| "region": prod["region"], | |
| "neighborhood": prod["neighborhood"], | |
| "score_semantico": float(sims[idx_local]), | |
| "bonus_lexical": float(bonus[idx_local]), | |
| "penalidade_feedback": float(penalidade_feedback[idx_local]), | |
| "score_final": float(score_final[idx_local]), | |
| }) | |
| return { | |
| "query": query_text, | |
| "categoria_inferida": categoria, | |
| "resultados": resultados, | |
| } | |