| | """ |
| | Streamlined MongoDB Vector Store with Atlas Vector Search |
| | """ |
| |
|
| | from typing import List, Dict, Any, Optional, NamedTuple |
| | import numpy as np |
| | from langchain.schema import Document |
| | from langchain.vectorstores.base import VectorStore |
| | from pymongo.collection import Collection |
| | from backend.config.logging_config import get_logger |
| |
|
| | logger = get_logger("custom_mongo_vector") |
| |
|
| | class VectorSearchOptions(NamedTuple): |
| | """Configuration options for vector search""" |
| | index_name: str = "foodInstructionIndex" |
| | embedding_key: str = "ingredients_emb" |
| | text_key: str = "title" |
| | num_candidates: int = 50 |
| | similarity_metric: str = "cosine" |
| |
|
| | class CustomMongoDBVectorStore(VectorStore): |
| | """ |
| | Streamlined MongoDB Atlas Vector Store with efficient $vectorSearch aggregation. |
| | Falls back to Python similarity calculation when Atlas Vector Search is unavailable. |
| | """ |
| | |
| | def __init__( |
| | self, |
| | collection: Collection, |
| | embedding_function, |
| | options: Optional[VectorSearchOptions] = None |
| | ): |
| | self.collection = collection |
| | self.embedding_function = embedding_function |
| | self.options = options or VectorSearchOptions() |
| | |
| | logger.info(f"🔧 Streamlined MongoDB Vector Store initialized") |
| | logger.info(f"� Config: {self.options.index_name} index, {self.options.similarity_metric} similarity") |
| | |
| | def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> float: |
| | """Calculate similarity using the most efficient method""" |
| | a, b = np.array(vec1), np.array(vec2) |
| | |
| | if self.options.similarity_metric == "dotProduct": |
| | |
| | return float(np.dot(a, b)) |
| | else: |
| | |
| | return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) |
| | |
| | def similarity_search(self, query: str, k: int = 4, **kwargs: Any) -> List[Document]: |
| | """Streamlined similarity search using Atlas Vector Search with Python fallback""" |
| | logger.info(f"🔍 Searching: '{query}' (k={k})") |
| | |
| | qvec = self.embedding_function.embed_query(query) |
| | |
| | |
| | try: |
| | pipeline = [ |
| | { |
| | "$vectorSearch": { |
| | "index": self.options.index_name, |
| | "path": self.options.embedding_key, |
| | "queryVector": qvec, |
| | "numCandidates": self.options.num_candidates, |
| | "limit": k |
| | } |
| | }, |
| | { |
| | "$match": { |
| | '$or': [ |
| | { 'needs_review': { '$exists': False } }, |
| | { 'needs_review': False } |
| | ] |
| | } |
| | } |
| | ] |
| | |
| | results = list(self.collection.aggregate(pipeline)) |
| | if results: |
| | logger.info(f"✅ Atlas Vector Search: {len(results)} results") |
| | return self._create_documents(results) |
| | |
| | except Exception as e: |
| | logger.warning(f"⚠️ Atlas Vector Search failed: {e}") |
| | |
| | |
| | logger.info("🔄 Using Python similarity fallback") |
| | return self._python_similarity_search(qvec, k) |
| | |
| | def _python_similarity_search(self, qvec: List[float], k: int) -> List[Document]: |
| | """Efficient Python-based similarity search fallback""" |
| | cursor = self.collection.find( |
| | {'$or': [ |
| | {'needs_review': {'$exists': False}}, |
| | {'needs_review': False} |
| | ]}, |
| | {self.options.text_key: 1, self.options.embedding_key: 1, "ingredients": 1, "instructions": 1} |
| | ) |
| | |
| | |
| | similarities = [] |
| | for doc in cursor: |
| | doc_emb = doc.get(self.options.embedding_key) |
| | if doc_emb and len(doc_emb) == len(qvec): |
| | score = self._calculate_similarity(qvec, doc_emb) |
| | similarities.append((doc, score)) |
| | |
| | |
| | similarities.sort(key=lambda x: x[1], reverse=True) |
| | top_docs = [doc for doc, _ in similarities[:k]] |
| | |
| | logger.info(f"📊 Python fallback: {len(similarities)} processed, {len(top_docs)} returned") |
| | return self._create_documents(top_docs) |
| | |
| | def _create_documents(self, docs: List[Dict]) -> List[Document]: |
| | """Create LangChain Documents from MongoDB results using clean string content""" |
| | documents = [] |
| | for doc in docs: |
| | title = doc.get(self.options.text_key, "Untitled Recipe") |
| | ingredients = doc.get("ingredients", "") |
| | instructions = doc.get("instructions", "") |
| | |
| | |
| | content_parts = [f"Recipe: {title}"] |
| | |
| | if ingredients: |
| | content_parts.append(f"Ingredients: {ingredients}") |
| | |
| | if instructions: |
| | content_parts.append(f"Instructions: {instructions}") |
| | |
| | content = "\n\n".join(content_parts) |
| | |
| | documents.append(Document( |
| | page_content=content, |
| | metadata={"_id": str(doc["_id"]), "title": title} |
| | )) |
| | |
| | return documents |
| | |
| | def similarity_search_with_score(self, query: str, k: int = 4, **kwargs: Any) -> List[tuple]: |
| | """Return docs with similarity scores (simplified)""" |
| | docs = self.similarity_search(query, k, **kwargs) |
| | return [(doc, 1.0) for doc in docs] |
| | def add_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None, **kwargs: Any) -> List[str]: |
| | """Read-only vector store - adding texts not supported""" |
| | raise NotImplementedError("This vector store is read-only for pre-existing embeddings") |
| | |
| | @classmethod |
| | def from_texts(cls, texts: List[str], embedding_function, metadatas: Optional[List[dict]] = None, **kwargs: Any): |
| | """Read-only vector store - creating from texts not supported""" |
| | raise NotImplementedError("This vector store is read-only for pre-existing embeddings") |
| |
|