# """ | |
# Qdrant service for vector database operations. | |
# """ | |
# import sys | |
# from pathlib import Path | |
# import structlog | |
# from typing import List, Dict, Any, Optional | |
# # Add parent directory to import database module | |
# parent_dir = Path(__file__).parent.parent.parent | |
# sys.path.append(str(parent_dir)) | |
# try: | |
# from database.qdrant_manager import QdrantManager | |
# from database.utils import EmbeddingGenerator | |
# from backend.models.schemas import SearchResult, APILogEntry | |
# except ImportError as e: | |
# # Services will be initialized when packages are available | |
# pass | |
# logger = structlog.get_logger() | |
# class QdrantService: | |
# """Service layer for Qdrant vector database operations.""" | |
# def __init__(self): | |
# """Initialize Qdrant service.""" | |
# self.qdrant_manager = None | |
# self.embedding_generator = None | |
# self._initialize() | |
# def _initialize(self): | |
# """Initialize Qdrant manager and embedding generator.""" | |
# try: | |
# import os | |
# from dotenv import load_dotenv | |
# # Load .env from project root (3 levels up from services) | |
# env_path = Path(__file__).parent.parent.parent / '.env' | |
# load_dotenv(env_path) | |
# # Qdrant configuration from environment variables | |
# qdrant_config = { | |
# 'url': os.getenv('QDRANT_URL'), | |
# 'api_key': os.getenv('QDRANT_API_KEY'), | |
# 'collection_name': os.getenv('QDRANT_COLLECTION', 'nuinamath') | |
# } | |
# if not qdrant_config['url'] or not qdrant_config['api_key']: | |
# raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in environment variables") | |
# self.qdrant_manager = QdrantManager( | |
# url=qdrant_config['url'], | |
# api_key=qdrant_config['api_key'] | |
# ) | |
# self.embedding_generator = EmbeddingGenerator() | |
# logger.info("Qdrant service initialized successfully") | |
# except Exception as e: | |
# logger.error("Failed to initialize Qdrant service", error=str(e)) | |
# # Service will work in degraded mode | |
# async def search_similar(self, question: str, limit: int = 5) -> List[SearchResult]: | |
# """ | |
# Search for similar math problems in the knowledge base. | |
# Args: | |
# question: The math question to search for | |
# limit: Maximum number of results to return | |
# Returns: | |
# List of SearchResult objects | |
# """ | |
# if not self.qdrant_manager or not self.embedding_generator: | |
# logger.warning("Qdrant service not properly initialized") | |
# return [] | |
# try: | |
# import os | |
# # Generate embedding for the question | |
# query_embedding = self.embedding_generator.embed_text(question) | |
# # Search in Qdrant | |
# collection_name = os.getenv('QDRANT_COLLECTION', 'nuinamath') | |
# results = self.qdrant_manager.search_similar( | |
# collection_name=collection_name, | |
# query_vector=query_embedding, | |
# limit=limit | |
# ) | |
# # Convert to SearchResult objects | |
# search_results = [] | |
# for result in results: | |
# payload = result.payload | |
# search_result = SearchResult( | |
# problem=payload.get('problem', ''), | |
# solution=payload.get('solution', ''), | |
# score=result.score | |
# ) | |
# search_results.append(search_result) | |
# logger.info("Knowledge base search completed", | |
# question_length=len(question), | |
# results_count=len(search_results), | |
# best_score=search_results[0].score if search_results else 0) | |
# return search_results | |
# except Exception as e: | |
# logger.error("Knowledge base search failed", error=str(e)) | |
# return [] | |
# async def log_api_call( | |
# self, | |
# endpoint: str, | |
# method: str, | |
# request_data: Dict[str, Any], | |
# response_data: Dict[str, Any], | |
# response_time_ms: float, | |
# source: str | |
# ): | |
# """ | |
# Log API call to Qdrant for analytics. | |
# Args: | |
# endpoint: API endpoint called | |
# method: HTTP method | |
# request_data: Request payload | |
# response_data: Response payload | |
# response_time_ms: Response time in milliseconds | |
# source: Source of the response (KB/MCP) | |
# """ | |
# if not self.qdrant_manager or not self.embedding_generator: | |
# logger.warning("Cannot log API call - Qdrant service not initialized") | |
# return | |
# try: | |
# # Create log entry | |
# log_entry = APILogEntry( | |
# endpoint=endpoint, | |
# method=method, | |
# request_data=request_data, | |
# response_data=response_data, | |
# response_time_ms=response_time_ms, | |
# source=source, | |
# status_code=200 # Default to 200 for successful responses | |
# ) | |
# # TODO: Store log entry in Qdrant analytics collection | |
# # For now, just log to stdout | |
# logger.info("API call logged", | |
# endpoint=endpoint, | |
# method=method, | |
# response_time_ms=response_time_ms, | |
# source=source) | |
# except Exception as e: | |
# logger.warning("Failed to log API call", error=str(e)) | |
""" | |
Qdrant service for vector database operations. | |
""" | |
import sys | |
from pathlib import Path | |
import structlog | |
from typing import List, Dict, Any, Optional, TYPE_CHECKING | |
# Add parent directory to import database module | |
parent_dir = Path(__file__).parent.parent.parent | |
sys.path.append(str(parent_dir)) | |
# Import for type checking only | |
if TYPE_CHECKING: | |
from backend.models.schemas import SearchResult, APILogEntry | |
try: | |
from database.qdrant_manager import QdrantManager | |
from database.utils import EmbeddingGenerator | |
from backend.models.schemas import SearchResult, APILogEntry | |
except ImportError as e: | |
# Services will be initialized when packages are available | |
pass | |
logger = structlog.get_logger() | |
class QdrantService: | |
"""Service layer for Qdrant vector database operations.""" | |
def __init__(self): | |
"""Initialize Qdrant service.""" | |
self.qdrant_manager = None | |
self.embedding_generator = None | |
self._initialize() | |
def _initialize(self): | |
"""Initialize Qdrant manager and embedding generator.""" | |
try: | |
import os | |
from dotenv import load_dotenv | |
# Load .env from project root (3 levels up from services) | |
env_path = Path(__file__).parent.parent.parent / '.env' | |
load_dotenv(env_path) | |
# Qdrant configuration from environment variables | |
qdrant_config = { | |
'url': os.getenv('QDRANT_URL'), | |
'api_key': os.getenv('QDRANT_API_KEY'), | |
'collection_name': os.getenv('QDRANT_COLLECTION', 'nuinamath') | |
} | |
if not qdrant_config['url'] or not qdrant_config['api_key']: | |
raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in environment variables") | |
self.qdrant_manager = QdrantManager( | |
url=qdrant_config['url'], | |
api_key=qdrant_config['api_key'] | |
) | |
self.embedding_generator = EmbeddingGenerator() | |
logger.info("Qdrant service initialized successfully") | |
except Exception as e: | |
logger.error("Failed to initialize Qdrant service", error=str(e)) | |
# Service will work in degraded mode | |
async def search_similar(self, question: str, limit: int = 5) -> List['SearchResult']: | |
""" | |
Search for similar math problems in the knowledge base. | |
Args: | |
question: The math question to search for | |
limit: Maximum number of results to return | |
Returns: | |
List of SearchResult objects | |
""" | |
if not self.qdrant_manager or not self.embedding_generator: | |
logger.warning("Qdrant service not properly initialized") | |
return [] | |
try: | |
import os | |
# Import SearchResult here to avoid NameError | |
from backend.models.schemas import SearchResult | |
# Generate embedding for the question | |
query_embedding = self.embedding_generator.embed_text(question) | |
# Search in Qdrant | |
collection_name = os.getenv('QDRANT_COLLECTION', 'nuinamath') | |
results = self.qdrant_manager.search_similar( | |
collection_name=collection_name, | |
query_vector=query_embedding, | |
limit=limit | |
) | |
# Convert to SearchResult objects | |
search_results = [] | |
for result in results: | |
payload = result.payload | |
search_result = SearchResult( | |
problem=payload.get('problem', ''), | |
solution=payload.get('solution', ''), | |
score=result.score | |
) | |
search_results.append(search_result) | |
logger.info("Knowledge base search completed", | |
question_length=len(question), | |
results_count=len(search_results), | |
best_score=search_results[0].score if search_results else 0) | |
return search_results | |
except Exception as e: | |
logger.error("Knowledge base search failed", error=str(e)) | |
return [] | |
async def log_api_call( | |
self, | |
endpoint: str, | |
method: str, | |
request_data: Dict[str, Any], | |
response_data: Dict[str, Any], | |
response_time_ms: float, | |
source: str | |
): | |
""" | |
Log API call to Qdrant for analytics. | |
Args: | |
endpoint: API endpoint called | |
method: HTTP method | |
request_data: Request payload | |
response_data: Response payload | |
response_time_ms: Response time in milliseconds | |
source: Source of the response (KB/MCP) | |
""" | |
if not self.qdrant_manager or not self.embedding_generator: | |
logger.warning("Cannot log API call - Qdrant service not initialized") | |
return | |
try: | |
# Import APILogEntry here to avoid NameError | |
from backend.models.schemas import APILogEntry | |
# Create log entry | |
log_entry = APILogEntry( | |
endpoint=endpoint, | |
method=method, | |
request_data=request_data, | |
response_data=response_data, | |
response_time_ms=response_time_ms, | |
source=source, | |
status_code=200 # Default to 200 for successful responses | |
) | |
# TODO: Store log entry in Qdrant analytics collection | |
# For now, just log to stdout | |
logger.info("API call logged", | |
endpoint=endpoint, | |
method=method, | |
response_time_ms=response_time_ms, | |
source=source) | |
except Exception as e: | |
logger.warning("Failed to log API call", error=str(e)) |