Praneeth Etta
hugging face space commit
ac6128c
# """
# Qdrant service for vector database operations.
# """
# import sys
# from pathlib import Path
# import structlog
# from typing import List, Dict, Any, Optional
# # Add parent directory to import database module
# parent_dir = Path(__file__).parent.parent.parent
# sys.path.append(str(parent_dir))
# try:
# from database.qdrant_manager import QdrantManager
# from database.utils import EmbeddingGenerator
# from backend.models.schemas import SearchResult, APILogEntry
# except ImportError as e:
# # Services will be initialized when packages are available
# pass
# logger = structlog.get_logger()
# class QdrantService:
# """Service layer for Qdrant vector database operations."""
# def __init__(self):
# """Initialize Qdrant service."""
# self.qdrant_manager = None
# self.embedding_generator = None
# self._initialize()
# def _initialize(self):
# """Initialize Qdrant manager and embedding generator."""
# try:
# import os
# from dotenv import load_dotenv
# # Load .env from project root (3 levels up from services)
# env_path = Path(__file__).parent.parent.parent / '.env'
# load_dotenv(env_path)
# # Qdrant configuration from environment variables
# qdrant_config = {
# 'url': os.getenv('QDRANT_URL'),
# 'api_key': os.getenv('QDRANT_API_KEY'),
# 'collection_name': os.getenv('QDRANT_COLLECTION', 'nuinamath')
# }
# if not qdrant_config['url'] or not qdrant_config['api_key']:
# raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in environment variables")
# self.qdrant_manager = QdrantManager(
# url=qdrant_config['url'],
# api_key=qdrant_config['api_key']
# )
# self.embedding_generator = EmbeddingGenerator()
# logger.info("Qdrant service initialized successfully")
# except Exception as e:
# logger.error("Failed to initialize Qdrant service", error=str(e))
# # Service will work in degraded mode
# async def search_similar(self, question: str, limit: int = 5) -> List[SearchResult]:
# """
# Search for similar math problems in the knowledge base.
# Args:
# question: The math question to search for
# limit: Maximum number of results to return
# Returns:
# List of SearchResult objects
# """
# if not self.qdrant_manager or not self.embedding_generator:
# logger.warning("Qdrant service not properly initialized")
# return []
# try:
# import os
# # Generate embedding for the question
# query_embedding = self.embedding_generator.embed_text(question)
# # Search in Qdrant
# collection_name = os.getenv('QDRANT_COLLECTION', 'nuinamath')
# results = self.qdrant_manager.search_similar(
# collection_name=collection_name,
# query_vector=query_embedding,
# limit=limit
# )
# # Convert to SearchResult objects
# search_results = []
# for result in results:
# payload = result.payload
# search_result = SearchResult(
# problem=payload.get('problem', ''),
# solution=payload.get('solution', ''),
# score=result.score
# )
# search_results.append(search_result)
# logger.info("Knowledge base search completed",
# question_length=len(question),
# results_count=len(search_results),
# best_score=search_results[0].score if search_results else 0)
# return search_results
# except Exception as e:
# logger.error("Knowledge base search failed", error=str(e))
# return []
# async def log_api_call(
# self,
# endpoint: str,
# method: str,
# request_data: Dict[str, Any],
# response_data: Dict[str, Any],
# response_time_ms: float,
# source: str
# ):
# """
# Log API call to Qdrant for analytics.
# Args:
# endpoint: API endpoint called
# method: HTTP method
# request_data: Request payload
# response_data: Response payload
# response_time_ms: Response time in milliseconds
# source: Source of the response (KB/MCP)
# """
# if not self.qdrant_manager or not self.embedding_generator:
# logger.warning("Cannot log API call - Qdrant service not initialized")
# return
# try:
# # Create log entry
# log_entry = APILogEntry(
# endpoint=endpoint,
# method=method,
# request_data=request_data,
# response_data=response_data,
# response_time_ms=response_time_ms,
# source=source,
# status_code=200 # Default to 200 for successful responses
# )
# # TODO: Store log entry in Qdrant analytics collection
# # For now, just log to stdout
# logger.info("API call logged",
# endpoint=endpoint,
# method=method,
# response_time_ms=response_time_ms,
# source=source)
# except Exception as e:
# logger.warning("Failed to log API call", error=str(e))
"""
Qdrant service for vector database operations.
"""
import sys
from pathlib import Path
import structlog
from typing import List, Dict, Any, Optional, TYPE_CHECKING
# Add parent directory to import database module
parent_dir = Path(__file__).parent.parent.parent
sys.path.append(str(parent_dir))
# Import for type checking only
if TYPE_CHECKING:
from backend.models.schemas import SearchResult, APILogEntry
try:
from database.qdrant_manager import QdrantManager
from database.utils import EmbeddingGenerator
from backend.models.schemas import SearchResult, APILogEntry
except ImportError as e:
# Services will be initialized when packages are available
pass
logger = structlog.get_logger()
class QdrantService:
"""Service layer for Qdrant vector database operations."""
def __init__(self):
"""Initialize Qdrant service."""
self.qdrant_manager = None
self.embedding_generator = None
self._initialize()
def _initialize(self):
"""Initialize Qdrant manager and embedding generator."""
try:
import os
from dotenv import load_dotenv
# Load .env from project root (3 levels up from services)
env_path = Path(__file__).parent.parent.parent / '.env'
load_dotenv(env_path)
# Qdrant configuration from environment variables
qdrant_config = {
'url': os.getenv('QDRANT_URL'),
'api_key': os.getenv('QDRANT_API_KEY'),
'collection_name': os.getenv('QDRANT_COLLECTION', 'nuinamath')
}
if not qdrant_config['url'] or not qdrant_config['api_key']:
raise ValueError("QDRANT_URL and QDRANT_API_KEY must be set in environment variables")
self.qdrant_manager = QdrantManager(
url=qdrant_config['url'],
api_key=qdrant_config['api_key']
)
self.embedding_generator = EmbeddingGenerator()
logger.info("Qdrant service initialized successfully")
except Exception as e:
logger.error("Failed to initialize Qdrant service", error=str(e))
# Service will work in degraded mode
async def search_similar(self, question: str, limit: int = 5) -> List['SearchResult']:
"""
Search for similar math problems in the knowledge base.
Args:
question: The math question to search for
limit: Maximum number of results to return
Returns:
List of SearchResult objects
"""
if not self.qdrant_manager or not self.embedding_generator:
logger.warning("Qdrant service not properly initialized")
return []
try:
import os
# Import SearchResult here to avoid NameError
from backend.models.schemas import SearchResult
# Generate embedding for the question
query_embedding = self.embedding_generator.embed_text(question)
# Search in Qdrant
collection_name = os.getenv('QDRANT_COLLECTION', 'nuinamath')
results = self.qdrant_manager.search_similar(
collection_name=collection_name,
query_vector=query_embedding,
limit=limit
)
# Convert to SearchResult objects
search_results = []
for result in results:
payload = result.payload
search_result = SearchResult(
problem=payload.get('problem', ''),
solution=payload.get('solution', ''),
score=result.score
)
search_results.append(search_result)
logger.info("Knowledge base search completed",
question_length=len(question),
results_count=len(search_results),
best_score=search_results[0].score if search_results else 0)
return search_results
except Exception as e:
logger.error("Knowledge base search failed", error=str(e))
return []
async def log_api_call(
self,
endpoint: str,
method: str,
request_data: Dict[str, Any],
response_data: Dict[str, Any],
response_time_ms: float,
source: str
):
"""
Log API call to Qdrant for analytics.
Args:
endpoint: API endpoint called
method: HTTP method
request_data: Request payload
response_data: Response payload
response_time_ms: Response time in milliseconds
source: Source of the response (KB/MCP)
"""
if not self.qdrant_manager or not self.embedding_generator:
logger.warning("Cannot log API call - Qdrant service not initialized")
return
try:
# Import APILogEntry here to avoid NameError
from backend.models.schemas import APILogEntry
# Create log entry
log_entry = APILogEntry(
endpoint=endpoint,
method=method,
request_data=request_data,
response_data=response_data,
response_time_ms=response_time_ms,
source=source,
status_code=200 # Default to 200 for successful responses
)
# TODO: Store log entry in Qdrant analytics collection
# For now, just log to stdout
logger.info("API call logged",
endpoint=endpoint,
method=method,
response_time_ms=response_time_ms,
source=source)
except Exception as e:
logger.warning("Failed to log API call", error=str(e))