| | |
| | import json |
| | import os |
| | import shutil |
| | from typing import List, Dict, Any, Optional |
| | from pathlib import Path |
| |
|
| | |
| | from langchain.text_splitter import RecursiveCharacterTextSplitter |
| | from langchain.schema import Document |
| |
|
| | |
| | from backend.config.settings import settings |
| | from backend.config.database import db_settings |
| | from backend.config.logging_config import get_logger |
| |
|
| | |
| | from pymongo import MongoClient |
| | from backend.services.custom_mongo_vector import CustomMongoDBVectorStore, VectorSearchOptions |
| |
|
| | |
| | logger = get_logger("vector_store") |
| |
|
| | class VectorStoreService: |
| | """Simple vector store service - creates or retrieves vector store for retriever use""" |
| | |
| | def __init__(self): |
| | logger.info("π Initializing Vector Store Service...") |
| | |
| | try: |
| | self.embeddings = self._get_embeddings() |
| | logger.info("β
Embeddings setup completed") |
| | |
| | self.vector_store = self._get_or_create_vector_store() |
| | logger.info("β
Vector store setup completed") |
| | |
| | logger.info("π Vector Store Service initialization successful") |
| | |
| | except Exception as e: |
| | logger.error(f"β Vector Store Service initialization failed: {str(e)}", exc_info=True) |
| | raise |
| | |
| | def _get_embeddings(self): |
| | """Get embeddings provider based on configuration with conditional imports""" |
| | embedding_config = settings.get_embedding_config() |
| | provider = embedding_config["provider"] |
| | |
| | logger.info(f"π§ Setting up embeddings provider: {provider}") |
| | |
| | if provider == "openai": |
| | try: |
| | from langchain_openai import OpenAIEmbeddings |
| | logger.info("β
OpenAI embeddings imported successfully") |
| | return OpenAIEmbeddings( |
| | openai_api_key=embedding_config["api_key"], |
| | model=embedding_config["model"] |
| | ) |
| | except ImportError as e: |
| | logger.error(f"β OpenAI embeddings not available: {e}") |
| | raise ImportError("OpenAI provider selected but langchain_openai not installed") |
| | |
| | elif provider == "google": |
| | try: |
| | from langchain_google_genai import GoogleGenerativeAIEmbeddings |
| | logger.info("β
Google embeddings imported successfully") |
| | return GoogleGenerativeAIEmbeddings( |
| | google_api_key=embedding_config["api_key"], |
| | model=embedding_config["model"] |
| | ) |
| | except ImportError as e: |
| | logger.error(f"β Google embeddings not available: {e}") |
| | raise ImportError("Google provider selected but langchain_google_genai not installed") |
| | |
| | elif provider == "huggingface": |
| | try: |
| | |
| | from langchain_huggingface import HuggingFaceEmbeddings |
| | logger.info("β
HuggingFace embeddings imported successfully") |
| | return HuggingFaceEmbeddings( |
| | model_name=embedding_config["model"] |
| | ) |
| | except ImportError: |
| | try: |
| | |
| | from sentence_transformers import SentenceTransformer |
| | logger.warning("β οΈ Using sentence-transformers directly (langchain-huggingface not available)") |
| | |
| | return self._create_sentence_transformer_wrapper(embedding_config["model"]) |
| | except ImportError as e: |
| | logger.error(f"β HuggingFace embeddings not available: {e}") |
| | logger.error("π‘ To fix this, install sentence-transformers: pip install sentence-transformers") |
| | raise ImportError("HuggingFace provider selected but sentence-transformers not installed. Run: pip install sentence-transformers") |
| | |
| | elif provider == "ollama": |
| | try: |
| | from langchain_community.embeddings import OllamaEmbeddings |
| | logger.info("β
Ollama embeddings imported successfully") |
| | return OllamaEmbeddings( |
| | base_url=embedding_config["base_url"], |
| | model=embedding_config["model"] |
| | ) |
| | except ImportError as e: |
| | logger.error(f"β Ollama embeddings not available: {e}") |
| | raise ImportError("Ollama provider selected but langchain_community not installed") |
| | |
| | else: |
| | logger.warning(f"β οΈ Unknown embedding provider '{provider}', falling back to OpenAI") |
| | try: |
| | from langchain_openai import OpenAIEmbeddings |
| | return OpenAIEmbeddings() |
| | except ImportError: |
| | logger.error("β No valid embedding provider available") |
| | raise ImportError("No valid embedding provider available") |
| | |
| | def _create_sentence_transformer_wrapper(self, model_name): |
| | """Create a simple wrapper for sentence-transformers to work with LangChain""" |
| | from sentence_transformers import SentenceTransformer |
| | |
| | class SentenceTransformerWrapper: |
| | def __init__(self, model_name): |
| | self.model = SentenceTransformer(model_name) |
| | |
| | def encode(self, texts): |
| | return self.model.encode(texts).tolist() |
| | |
| | def embed_query(self, text): |
| | return self.model.encode([text])[0].tolist() |
| | |
| | return SentenceTransformerWrapper(model_name) |
| | |
| | def _get_or_create_vector_store(self): |
| | """Get or create vector store with conditional imports""" |
| | db_config = db_settings.get_vector_store_config() |
| | provider = db_config["provider"] |
| | |
| | if provider == "chromadb": |
| | try: |
| | from langchain_chroma import Chroma |
| | |
| | persist_dir = Path(db_config["persist_directory"]) |
| | collection_name = db_config["collection_name"] |
| | refresh_on_start = db_config.get("refresh_on_start", False) |
| | |
| | |
| | if refresh_on_start and persist_dir.exists(): |
| | logger.info(f"π CHROMADB_REFRESH_ON_START=true - Deleting existing ChromaDB at {persist_dir}") |
| | shutil.rmtree(persist_dir) |
| | logger.info(f"β
Existing ChromaDB deleted successfully") |
| | |
| | |
| | if persist_dir.exists() and any(persist_dir.iterdir()): |
| | logger.info(f"π Loading existing ChromaDB from {persist_dir}") |
| | return Chroma( |
| | collection_name=collection_name, |
| | embedding_function=self.embeddings, |
| | persist_directory=str(persist_dir) |
| | ) |
| | else: |
| | |
| | logger.info(f"π Creating new ChromaDB at {persist_dir}") |
| | documents = self._load_documents_from_folder() |
| | |
| | if documents: |
| | vector_store = Chroma.from_documents( |
| | documents=documents, |
| | embedding=self.embeddings, |
| | collection_name=collection_name, |
| | persist_directory=str(persist_dir) |
| | ) |
| | logger.info(f"β
Created ChromaDB with {len(documents)} document chunks") |
| | return vector_store |
| | else: |
| | logger.info("π No documents found, creating empty ChromaDB") |
| | return Chroma( |
| | collection_name=collection_name, |
| | embedding_function=self.embeddings, |
| | persist_directory=str(persist_dir) |
| | ) |
| | except ImportError as e: |
| | logger.error(f"β ChromaDB not available: {e}") |
| | raise ImportError("ChromaDB provider selected but langchain_chroma not installed") |
| | |
| | elif provider == "mongodb": |
| | try: |
| | logger.info("π Setting up MongoDB Atlas connection...") |
| | client = MongoClient(db_config["uri"]) |
| | client.admin.command('ping') |
| | logger.info(f"β
MongoDB Atlas connection verified") |
| | print(client.list_database_names()) |
| | |
| | database = client[db_config["database"]] |
| | collection = database[db_config["collection_name"]] |
| | |
| | options = VectorSearchOptions( |
| | index_name=db_config.get("index_name", "vector_index"), |
| | embedding_key=db_config.get("vector_field", "ingredients_emb"), |
| | text_key="title", |
| | num_candidates=db_config.get("num_candidates", 50), |
| | similarity_metric=db_config.get("similarity_metric", "cosine") |
| | ) |
| | |
| | vector_store = CustomMongoDBVectorStore( |
| | collection=collection, |
| | embedding_function=self.embeddings, |
| | options=options |
| | ) |
| | |
| | logger.info(f"β
Custom MongoDB Vector Store created successfully") |
| | logger.info("π― Using pre-existing embeddings without requiring vector search index") |
| | return vector_store |
| | |
| | except ImportError as e: |
| | logger.error(f"β MongoDB packages not available: {e}") |
| | raise ImportError("MongoDB provider selected but langchain-mongodb not installed. Run: pip install langchain-mongodb pymongo") |
| | except Exception as e: |
| | logger.error(f"β MongoDB Atlas connection failed: {e}") |
| | raise ConnectionError(f"Failed to connect to MongoDB Atlas: {e}") |
| | |
| | else: |
| | logger.warning(f"β οΈ Unknown vector store provider '{provider}', falling back to ChromaDB") |
| | try: |
| | from langchain_chroma import Chroma |
| | return Chroma( |
| | collection_name="fallback_collection", |
| | embedding_function=self.embeddings, |
| | persist_directory="./vector_store/fallback_chroma" |
| | ) |
| | except ImportError: |
| | logger.error("β No valid vector store provider available") |
| | raise ImportError("No valid vector store provider available") |
| |
|
| | def _load_documents_from_folder(self, folder_path: str = "./data/recipes") -> List[Document]: |
| | """Load and chunk all documents from folder with UTF-8 encoding, fallback to sample data""" |
| | logger.info(f"π Loading documents from: {folder_path}") |
| | |
| | documents = [] |
| | folder = Path(folder_path) |
| | |
| | |
| | has_recipe_files = False |
| | if folder.exists(): |
| | |
| | recipe_files = list(folder.rglob("*")) |
| | has_recipe_files = any(f.is_file() and f.stat().st_size > 0 for f in recipe_files) |
| | |
| | |
| | if not has_recipe_files: |
| | logger.info(f"π No recipe files found in {folder_path}, using sample data") |
| | folder_path = "./data" |
| | folder = Path(folder_path) |
| | |
| | if not folder.exists(): |
| | logger.error(f"β Folder does not exist: {folder.absolute()}") |
| | return documents |
| | |
| | |
| | text_splitter = RecursiveCharacterTextSplitter( |
| | chunk_size=1000, |
| | chunk_overlap=200 |
| | ) |
| | |
| | |
| | for file_path in folder.rglob("*"): |
| | if file_path.is_file(): |
| | try: |
| | |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | content = f.read() |
| | |
| | |
| | if not content.strip(): |
| | continue |
| | |
| | |
| | if file_path.suffix.lower() == '.json': |
| | formatted_content = self._format_json_recipes(content, file_path) |
| | if formatted_content: |
| | content = formatted_content |
| | |
| | |
| | chunks = text_splitter.split_text(content) |
| | |
| | |
| | for i, chunk in enumerate(chunks): |
| | documents.append(Document( |
| | page_content=chunk, |
| | metadata={ |
| | "source": str(file_path), |
| | "filename": file_path.name, |
| | "chunk_index": i, |
| | "file_type": file_path.suffix |
| | } |
| | )) |
| | |
| | except Exception as e: |
| | logger.error(f"β Error loading {file_path}: {e}") |
| | continue |
| | |
| | logger.info(f"β
Loaded and chunked {len(documents)} document segments") |
| | return documents |
| |
|
| | def _format_json_recipes(self, json_content: str, file_path: Path) -> str: |
| | """Format JSON recipe data into readable text format similar to MongoDB output""" |
| | try: |
| | import json |
| | recipes = json.loads(json_content) |
| | |
| | |
| | if isinstance(recipes, dict): |
| | recipes = [recipes] |
| | elif not isinstance(recipes, list): |
| | logger.warning(f"β οΈ Unexpected JSON structure in {file_path}") |
| | return None |
| | |
| | formatted_recipes = [] |
| | |
| | for recipe in recipes: |
| | if not isinstance(recipe, dict): |
| | continue |
| | |
| | |
| | title = recipe.get("title", "Untitled Recipe") |
| | ingredients = recipe.get("ingredients", []) |
| | instructions = recipe.get("instructions", "") |
| | |
| | |
| | formatted_content = f"Recipe: {title}\n" |
| | |
| | if ingredients: |
| | if isinstance(ingredients, list): |
| | formatted_content += f"Ingredients: {', '.join(ingredients)}\n" |
| | else: |
| | formatted_content += f"Ingredients: {ingredients}\n" |
| | |
| | if instructions: |
| | |
| | if isinstance(instructions, list): |
| | formatted_content += f"Instructions: {' '.join(instructions)}" |
| | else: |
| | formatted_content += f"Instructions: {instructions}" |
| | |
| | |
| | metadata = recipe.get("metadata", {}) |
| | if metadata: |
| | formatted_content += f"\n" |
| | for key, value in metadata.items(): |
| | if key in ["cook_time", "difficulty", "servings", "category"]: |
| | formatted_content += f"{key.replace('_', ' ').title()}: {value}\n" |
| | |
| | formatted_recipes.append(formatted_content) |
| | |
| | |
| | result = "\n\n".join(formatted_recipes) |
| | logger.info(f"β
Formatted {len(recipes)} JSON recipes from {file_path.name}") |
| | return result |
| | |
| | except json.JSONDecodeError as e: |
| | logger.error(f"β Invalid JSON in {file_path}: {e}") |
| | return None |
| | except Exception as e: |
| | logger.error(f"β Error formatting JSON recipes from {file_path}: {e}") |
| | return None |
| |
|
| | def get_retriever(self): |
| | """Get retriever for use with ConversationalRetrievalChain""" |
| | logger.info("π Creating retriever from vector store...") |
| | |
| | |
| | retriever = self.vector_store.as_retriever() |
| | |
| | |
| | if hasattr(self.vector_store, '__class__'): |
| | class_name = self.vector_store.__class__.__name__ |
| | if 'MongoDB' in class_name: |
| | |
| | retriever.search_kwargs = {"k": 5} |
| | logger.info("π MongoDB Atlas retriever configured with k=5") |
| | else: |
| | |
| | retriever.search_kwargs = {"k": 5} |
| | logger.info("π ChromaDB retriever configured with k=5") |
| |
|
| | return retriever |
| |
|
| | |
| | vector_store_service = VectorStoreService() |