| | """ |
| | RAG Engine v2.0 - Advanced retrieval with: |
| | β’ Multilingual embeddings (BGE-M3 / gte-multilingual / multilingual-e5) |
| | β’ Hybrid search (Vector + BM25 keyword via Reciprocal Rank Fusion) |
| | β’ Reranking with BGE-Reranker-v2 |
| | β’ Metadata filtering by document type (CV vs Job Offer vs LinkedIn) |
| | All 100% free & local. |
| | """ |
| | import os |
| | import hashlib |
| | import logging |
| | from typing import List, Tuple, Optional, Dict |
| |
|
| | from langchain_huggingface import HuggingFaceEmbeddings |
| | from langchain_chroma import Chroma |
| | from langchain_core.documents import Document |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| |
|
| | EMBEDDING_MODELS = { |
| | "bge-m3": { |
| | "name": "BAAI/bge-m3", |
| | "display": "π BGE-M3 (Multilingual Β· Recomendado)", |
| | "description": "Mejor modelo multilingual 2025. Dense+sparse, 100+ idiomas, ideal para RAG.", |
| | "size": "~2.3 GB", |
| | "languages": "100+", |
| | "performance": "βββββ", |
| | }, |
| | "gte-multilingual": { |
| | "name": "Alibaba-NLP/gte-multilingual-base", |
| | "display": "π GTE Multilingual (Ligero Β· 70+ idiomas)", |
| | "description": "Excelente balance tamaΓ±o/calidad. 70+ idiomas, encoder-only.", |
| | "size": "~580 MB", |
| | "languages": "70+", |
| | "performance": "ββββ", |
| | }, |
| | "multilingual-e5": { |
| | "name": "intfloat/multilingual-e5-base", |
| | "display": "π Multilingual E5 Base (EstΓ‘ndar)", |
| | "description": "Modelo estΓ‘ndar multilingual para retrieval y similitud semΓ‘ntica.", |
| | "size": "~1.1 GB", |
| | "languages": "100+", |
| | "performance": "ββββ", |
| | }, |
| | "minilm-v2": { |
| | "name": "sentence-transformers/all-MiniLM-L6-v2", |
| | "display": "β‘ MiniLM v2 (Ultra-ligero Β· Solo inglΓ©s)", |
| | "description": "Modelo original, muy rΓ‘pido pero solo inglΓ©s. Ideal para pruebas.", |
| | "size": "~90 MB", |
| | "languages": "InglΓ©s", |
| | "performance": "βββ", |
| | }, |
| | } |
| |
|
| | DEFAULT_EMBEDDING = "bge-m3" |
| |
|
| |
|
| | |
| |
|
| | class BM25Index: |
| | """Lightweight BM25 keyword index for hybrid search.""" |
| |
|
| | def __init__(self): |
| | self._documents: List[str] = [] |
| | self._metadatas: List[dict] = [] |
| | self._index = None |
| |
|
| | @property |
| | def is_ready(self) -> bool: |
| | return self._index is not None and len(self._documents) > 0 |
| |
|
| | def add(self, texts: List[str], metadatas: List[dict]): |
| | """Add documents to the BM25 index.""" |
| | self._documents.extend(texts) |
| | self._metadatas.extend(metadatas) |
| | self._rebuild() |
| |
|
| | def _rebuild(self): |
| | """Rebuild the BM25 index from scratch.""" |
| | try: |
| | from rank_bm25 import BM25Okapi |
| | tokenized = [doc.lower().split() for doc in self._documents] |
| | if tokenized: |
| | self._index = BM25Okapi(tokenized) |
| | except ImportError: |
| | logger.warning("rank_bm25 not installed β keyword search disabled. pip install rank_bm25") |
| | self._index = None |
| |
|
| | def search( |
| | self, query: str, k: int = 10, filter_dict: Optional[dict] = None, |
| | ) -> List[Tuple[str, dict, float]]: |
| | """Search using BM25 keyword matching.""" |
| | if not self.is_ready: |
| | return [] |
| |
|
| | tokenized_query = query.lower().split() |
| | scores = self._index.get_scores(tokenized_query) |
| |
|
| | |
| | results = [] |
| | for idx, score in enumerate(scores): |
| | if score <= 0: |
| | continue |
| | meta = self._metadatas[idx] if idx < len(self._metadatas) else {} |
| | |
| | if filter_dict: |
| | if not all(meta.get(k_f) == v_f for k_f, v_f in filter_dict.items()): |
| | continue |
| | results.append((self._documents[idx], meta, float(score))) |
| |
|
| | |
| | results.sort(key=lambda x: x[2], reverse=True) |
| | return results[:k] |
| |
|
| | def clear(self): |
| | """Clear the BM25 index.""" |
| | self._documents.clear() |
| | self._metadatas.clear() |
| | self._index = None |
| |
|
| | def rebuild_from_chroma(self, chroma_collection): |
| | """Rebuild BM25 index from existing ChromaDB collection.""" |
| | try: |
| | data = chroma_collection.get() |
| | if data and data.get("documents"): |
| | self._documents = list(data["documents"]) |
| | self._metadatas = list(data.get("metadatas", [{}] * len(self._documents))) |
| | self._rebuild() |
| | logger.info(f"BM25 index rebuilt with {len(self._documents)} documents") |
| | except Exception as e: |
| | logger.warning(f"Failed to rebuild BM25 index: {e}") |
| |
|
| |
|
| | |
| |
|
| | class Reranker: |
| | """Cross-encoder reranker using BGE-Reranker-v2-m3 (free, local, multilingual).""" |
| |
|
| | def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"): |
| | self.model_name = model_name |
| | self._model = None |
| |
|
| | @property |
| | def is_ready(self) -> bool: |
| | return self._model is not None |
| |
|
| | def load(self): |
| | """Lazy-load the reranker model.""" |
| | if self._model is not None: |
| | return |
| | try: |
| | from sentence_transformers import CrossEncoder |
| | self._model = CrossEncoder(self.model_name, max_length=512) |
| | logger.info(f"Reranker loaded: {self.model_name}") |
| | except ImportError: |
| | logger.warning("sentence-transformers not installed for reranking") |
| | except Exception as e: |
| | logger.warning(f"Failed to load reranker: {e}") |
| |
|
| | def rerank( |
| | self, |
| | query: str, |
| | results: List[Tuple[str, dict, float]], |
| | top_k: int = 5, |
| | ) -> List[Tuple[str, dict, float]]: |
| | """Rerank results using cross-encoder scoring.""" |
| | if not self.is_ready or not results: |
| | return results[:top_k] |
| |
|
| | try: |
| | pairs = [(query, content) for content, _, _ in results] |
| | scores = self._model.predict(pairs) |
| |
|
| | reranked = [] |
| | for i, (content, meta, _) in enumerate(results): |
| | reranked.append((content, meta, float(scores[i]))) |
| |
|
| | reranked.sort(key=lambda x: x[2], reverse=True) |
| | return reranked[:top_k] |
| | except Exception as e: |
| | logger.warning(f"Reranking failed, returning original order: {e}") |
| | return results[:top_k] |
| |
|
| |
|
| | |
| |
|
| | def reciprocal_rank_fusion( |
| | results_list: List[List[Tuple[str, dict, float]]], |
| | k: int = 60, |
| | top_n: int = 15, |
| | ) -> List[Tuple[str, dict, float]]: |
| | """ |
| | Merge multiple ranked result lists using Reciprocal Rank Fusion (RRF). |
| | Each result is identified by content hash. Final score = sum(1 / (k + rank)). |
| | """ |
| | fused_scores: Dict[str, float] = {} |
| | content_map: Dict[str, Tuple[str, dict]] = {} |
| |
|
| | for results in results_list: |
| | for rank, (content, meta, _) in enumerate(results): |
| | key = hashlib.md5(content[:200].encode()).hexdigest() |
| | fused_scores[key] = fused_scores.get(key, 0.0) + 1.0 / (k + rank + 1) |
| | if key not in content_map: |
| | content_map[key] = (content, meta) |
| |
|
| | sorted_keys = sorted(fused_scores.keys(), key=lambda x: fused_scores[x], reverse=True) |
| |
|
| | merged = [] |
| | for key in sorted_keys[:top_n]: |
| | content, meta = content_map[key] |
| | merged.append((content, meta, fused_scores[key])) |
| |
|
| | return merged |
| |
|
| |
|
| | |
| |
|
| | class RAGEngine: |
| | """ |
| | Advanced RAG Engine v2.0 with: |
| | - Selectable multilingual embeddings |
| | - Hybrid search (vector + BM25 keyword) |
| | - Cross-encoder reranking |
| | - Metadata filtering |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | persist_directory: str = None, |
| | embedding_key: str = DEFAULT_EMBEDDING, |
| | enable_reranking: bool = True, |
| | enable_hybrid: bool = True, |
| | ): |
| | if persist_directory is None: |
| | persist_directory = os.path.join( |
| | os.path.dirname(os.path.dirname(__file__)), "data", "vectordb" |
| | ) |
| | self.persist_directory = persist_directory |
| | os.makedirs(persist_directory, exist_ok=True) |
| |
|
| | |
| | self.embedding_key = embedding_key |
| | model_info = EMBEDDING_MODELS.get(embedding_key, EMBEDDING_MODELS[DEFAULT_EMBEDDING]) |
| | model_name = model_info["name"] |
| |
|
| | self.embeddings = HuggingFaceEmbeddings( |
| | model_name=model_name, |
| | model_kwargs={"device": "cpu", "trust_remote_code": True}, |
| | encode_kwargs={"normalize_embeddings": True}, |
| | ) |
| |
|
| | |
| | |
| | collection_name = f"career_docs_{embedding_key.replace('-', '_')}" |
| | self.vectorstore = Chroma( |
| | collection_name=collection_name, |
| | embedding_function=self.embeddings, |
| | persist_directory=persist_directory, |
| | ) |
| |
|
| | |
| | self.enable_hybrid = enable_hybrid |
| | self.bm25 = BM25Index() |
| | if enable_hybrid: |
| | try: |
| | self.bm25.rebuild_from_chroma(self.vectorstore._collection) |
| | except Exception: |
| | pass |
| |
|
| | |
| | self.enable_reranking = enable_reranking |
| | self.reranker = Reranker() if enable_reranking else None |
| |
|
| | |
| |
|
| | def add_document(self, chunks: List[str], metadata: dict, user_id: str = "anonymous") -> int: |
| | """Add document chunks to vector store + BM25 index.""" |
| | if not chunks: |
| | return 0 |
| |
|
| | docs = [] |
| | chunk_metas = [] |
| | for i, chunk in enumerate(chunks): |
| | doc_id = hashlib.md5( |
| | f"{metadata.get('filename', 'unknown')}_{i}_{chunk[:50]}".encode() |
| | ).hexdigest() |
| |
|
| | doc_metadata = { |
| | **metadata, |
| | "user_id": user_id, |
| | "chunk_index": i, |
| | "total_chunks": len(chunks), |
| | "doc_id": doc_id, |
| | } |
| | docs.append(Document(page_content=chunk, metadata=doc_metadata)) |
| | chunk_metas.append(doc_metadata) |
| |
|
| | |
| | self.vectorstore.add_documents(docs) |
| |
|
| | |
| | if self.enable_hybrid: |
| | self.bm25.add(chunks, chunk_metas) |
| |
|
| | logger.info(f"Added {len(docs)} chunks for '{metadata.get('filename', '?')}'") |
| | return len(docs) |
| |
|
| | def delete_document(self, filename: str, user_id: str = "anonymous"): |
| | """Delete all chunks for a specific document considering user_id.""" |
| | try: |
| | collection = self.vectorstore._collection |
| | |
| | if user_id == "anonymous": |
| | |
| | results = collection.get(where={"$and": [{"filename": filename}, {"user_id": user_id}]}) |
| | |
| | |
| | if not results or not results.get("ids"): |
| | all_file_docs = collection.get(where={"filename": filename}) |
| | if all_file_docs and all_file_docs.get("ids"): |
| | legacy_ids = [ids for i, ids in enumerate(all_file_docs["ids"]) if "user_id" not in all_file_docs["metadatas"][i]] |
| | results = {"ids": legacy_ids} |
| | else: |
| | results = collection.get(where={"$and": [{"filename": filename}, {"user_id": user_id}]}) |
| | |
| | if results and results.get("ids"): |
| | collection.delete(ids=results["ids"]) |
| | |
| | if self.enable_hybrid: |
| | self.bm25.rebuild_from_chroma(collection) |
| | return True |
| | return False |
| | except Exception as e: |
| | logger.error(f"Error deleting document: {e}") |
| | return False |
| |
|
| | |
| |
|
| | def search( |
| | self, |
| | query: str, |
| | k: int = 5, |
| | filter_dict: Optional[dict] = None, |
| | user_id: str = "anonymous" |
| | ) -> List[Tuple[str, dict, float]]: |
| | """ |
| | Advanced search pipeline: |
| | 1. Vector similarity search (semantic) |
| | 2. BM25 keyword search (lexical) β if hybrid enabled |
| | 3. Reciprocal Rank Fusion to merge results |
| | 4. Reranking with cross-encoder β if enabled |
| | """ |
| | |
| | filter_dict = filter_dict or {} |
| | if "user_id" not in filter_dict: |
| | filter_dict["user_id"] = user_id |
| | |
| | |
| | if len(filter_dict) > 1: |
| | chroma_filter = {"$and": [{k: v} for k, v in filter_dict.items()]} |
| | else: |
| | chroma_filter = filter_dict |
| |
|
| | |
| | vector_results = self._vector_search(query, k=k * 2, filter_dict=chroma_filter) |
| |
|
| | |
| | if self.enable_hybrid and self.bm25.is_ready: |
| | bm25_results = self.bm25.search(query, k=k * 2, filter_dict=chroma_filter) |
| |
|
| | |
| | merged = reciprocal_rank_fusion( |
| | [vector_results, bm25_results], |
| | top_n=k * 2, |
| | ) |
| | else: |
| | merged = vector_results |
| |
|
| | |
| | if self.enable_reranking and self.reranker is not None: |
| | if not self.reranker.is_ready: |
| | self.reranker.load() |
| | if self.reranker.is_ready: |
| | merged = self.reranker.rerank(query, merged, top_k=k) |
| | else: |
| | merged = merged[:k] |
| | else: |
| | merged = merged[:k] |
| |
|
| | return merged |
| |
|
| | def _vector_search( |
| | self, query: str, k: int = 10, filter_dict: Optional[dict] = None, |
| | ) -> List[Tuple[str, dict, float]]: |
| | """Pure vector similarity search.""" |
| | try: |
| | results = self.vectorstore.similarity_search_with_score( |
| | query, k=k, filter=filter_dict |
| | ) |
| | return [ |
| | (doc.page_content, doc.metadata, score) for doc, score in results |
| | ] |
| | except Exception as e: |
| | logger.warning(f"Vector search failed: {e}") |
| | return [] |
| |
|
| | def search_by_type( |
| | self, |
| | query: str, |
| | doc_type: str, |
| | k: int = 5, |
| | ) -> List[Tuple[str, dict, float]]: |
| | """Search filtered by document type (cv, job_offer, linkedin, other).""" |
| | return self.search(query, k=k, filter_dict={"doc_type": doc_type}) |
| |
|
| | |
| |
|
| | def get_context( |
| | self, |
| | query: str, |
| | k: int = 8, |
| | filter_type: Optional[str] = None, |
| | user_id: str = "anonymous" |
| | ) -> str: |
| | """Get formatted context string for LLM consumption.""" |
| | filter_dict = {"doc_type": filter_type} if filter_type else {} |
| | |
| | results = self.search(query, k=k, filter_dict=filter_dict, user_id=user_id) |
| |
|
| | if not results: |
| | return "β οΈ No se encontraron documentos relevantes. Por favor, sube tu CV u otros documentos primero." |
| |
|
| | context_parts = [] |
| | seen_content = set() |
| |
|
| | for content, metadata, score in results: |
| | |
| | content_hash = hashlib.md5(content[:100].encode()).hexdigest() |
| | if content_hash in seen_content: |
| | continue |
| | seen_content.add(content_hash) |
| |
|
| | source = metadata.get("filename", "Desconocido") |
| | doc_type = metadata.get("doc_type", "documento") |
| |
|
| | type_labels = { |
| | "cv": "π CV/Resume", |
| | "job_offer": "πΌ Oferta de Trabajo", |
| | "linkedin": "π€ LinkedIn", |
| | "other": "π Documento", |
| | } |
| | type_label = type_labels.get(doc_type, "π Documento") |
| |
|
| | |
| | score_str = f"{score:.3f}" |
| |
|
| | context_parts.append( |
| | f"[{type_label} | Fuente: {source} | Score: {score_str}]\n{content}" |
| | ) |
| |
|
| | return "\n\n" + "β" * 50 + "\n\n".join(context_parts) |
| |
|
| | |
| |
|
| | def get_document_list(self, user_id: str = "anonymous") -> List[str]: |
| | """Get list of all indexed document filenames for a user.""" |
| | try: |
| | collection = self.vectorstore._collection |
| | if user_id == "anonymous": |
| | |
| | |
| | results = collection.get(where={"user_id": user_id}) |
| | |
| | if not results.get("ids"): |
| | all_docs = collection.get() |
| | results = {"metadatas": [m for m in all_docs.get("metadatas", []) if "user_id" not in m]} |
| | else: |
| | results = collection.get(where={"user_id": user_id}) |
| | |
| | filenames = set() |
| | for meta in results.get("metadatas", []): |
| | if meta and "filename" in meta: |
| | filenames.add(meta["filename"]) |
| | return sorted(list(filenames)) |
| | except Exception: |
| | return [] |
| |
|
| | def get_stats(self, user_id: str = "anonymous") -> dict: |
| | """Get vector store statistics for a user.""" |
| | try: |
| | collection = self.vectorstore._collection |
| | if user_id == "anonymous": |
| | results = collection.get(where={"user_id": user_id}) |
| | if not results.get("ids"): |
| | all_docs = collection.get() |
| | results = {"ids": [ids for i, ids in enumerate(all_docs.get("ids", [])) if "user_id" not in all_docs.get("metadatas", [])[i]]} |
| | else: |
| | results = collection.get(where={"user_id": user_id}) |
| |
|
| | count = len(results["ids"]) if results and results.get("ids") else 0 |
| | docs = self.get_document_list(user_id=user_id) |
| | return { |
| | "total_chunks": count, |
| | "total_documents": len(docs), |
| | "documents": docs, |
| | "embedding_model": self.embedding_key, |
| | "hybrid_search": self.enable_hybrid and self.bm25.is_ready, |
| | "reranking": self.enable_reranking, |
| | } |
| | except Exception: |
| | return { |
| | "total_chunks": 0, |
| | "total_documents": 0, |
| | "documents": [], |
| | "embedding_model": self.embedding_key, |
| | "hybrid_search": False, |
| | "reranking": False, |
| | } |
| |
|
| | def get_all_text(self, user_id: str = "anonymous") -> str: |
| | """Get all document text for a specific user (for full-context queries).""" |
| | try: |
| | collection = self.vectorstore._collection |
| | results = collection.get(where={"user_id": user_id}) |
| | if results and results["documents"]: |
| | return "\n\n".join(results["documents"]) |
| | except Exception: |
| | pass |
| | return "" |
| |
|
| | def get_documents_by_type(self) -> Dict[str, List[str]]: |
| | """Get documents grouped by type.""" |
| | try: |
| | collection = self.vectorstore._collection |
| | results = collection.get() |
| | by_type: Dict[str, List[str]] = {} |
| | for meta in results.get("metadatas", []): |
| | if meta: |
| | doc_type = meta.get("doc_type", "other") |
| | filename = meta.get("filename", "?") |
| | if doc_type not in by_type: |
| | by_type[doc_type] = [] |
| | if filename not in by_type[doc_type]: |
| | by_type[doc_type].append(filename) |
| | return by_type |
| | except Exception: |
| | return {} |
| |
|