Spaces:
Sleeping
Sleeping
| """ | |
| Qdrant client wrapper for vector database operations. | |
| """ | |
| import logging | |
| from typing import List, Dict, Any | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| import time | |
| logger = logging.getLogger(__name__) | |
| class QdrantManager: | |
| """Manages Qdrant vector database operations.""" | |
| def __init__(self, url: str, api_key: str): | |
| """Initialize Qdrant client.""" | |
| self.client = QdrantClient(url=url, api_key=api_key) | |
| logger.info(f"Connected to Qdrant at {url}") | |
| def create_collection(self, collection_name: str, vector_size: int, distance: str = "Cosine"): | |
| """ | |
| Create a new collection in Qdrant. | |
| Args: | |
| collection_name: Name of the collection | |
| vector_size: Dimension of vectors | |
| distance: Distance metric (Cosine, Euclidean, Dot) | |
| """ | |
| try: | |
| # Check if collection already exists | |
| collections = self.client.get_collections().collections | |
| existing_names = [col.name for col in collections] | |
| if collection_name in existing_names: | |
| logger.info(f"Collection '{collection_name}' already exists") | |
| return True | |
| # Create new collection | |
| distance_map = { | |
| "Cosine": Distance.COSINE, | |
| "Euclidean": Distance.EUCLID, | |
| "Dot": Distance.DOT | |
| } | |
| self.client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=vector_size, | |
| distance=distance_map.get(distance, Distance.COSINE) | |
| ) | |
| ) | |
| logger.info(f"Created collection '{collection_name}' with vector size {vector_size}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error creating collection: {e}") | |
| return False | |
| def upsert_points(self, collection_name: str, points_data: List[Dict[str, Any]], | |
| embeddings: List[List[float]], max_retries: int = 3): | |
| """ | |
| Upsert points into Qdrant collection with retry logic. | |
| Args: | |
| collection_name: Name of the collection | |
| points_data: List of point data dictionaries | |
| embeddings: List of embedding vectors | |
| max_retries: Maximum number of retry attempts | |
| """ | |
| points = [] | |
| for i, (data, embedding) in enumerate(zip(points_data, embeddings)): | |
| point = PointStruct( | |
| id=data['id'], | |
| vector=embedding, | |
| payload={ | |
| 'problem': data['problem'], | |
| 'solution': data['solution'], | |
| 'source': data['source'] | |
| } | |
| ) | |
| points.append(point) | |
| # Retry logic for network issues | |
| for attempt in range(max_retries): | |
| try: | |
| self.client.upsert( | |
| collection_name=collection_name, | |
| points=points | |
| ) | |
| logger.info(f"Successfully upserted {len(points)} points") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(2 ** attempt) # Exponential backoff | |
| else: | |
| logger.error(f"Failed to upsert points after {max_retries} attempts") | |
| raise e | |
| def search_similar(self, collection_name: str, query_vector: List[float], | |
| limit: int = 3, score_threshold: float = 0.0): | |
| """ | |
| Search for similar vectors in the collection. | |
| Args: | |
| collection_name: Name of the collection | |
| query_vector: Query embedding vector | |
| limit: Number of results to return | |
| score_threshold: Minimum similarity score | |
| Returns: | |
| Search results from Qdrant | |
| """ | |
| try: | |
| results = self.client.search( | |
| collection_name=collection_name, | |
| query_vector=query_vector, | |
| limit=limit, | |
| score_threshold=score_threshold | |
| ) | |
| logger.info(f"Found {len(results)} similar results") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Error searching collection: {e}") | |
| return [] | |
| def get_collection_info(self, collection_name: str): | |
| """Get information about a collection.""" | |
| try: | |
| info = self.client.get_collection(collection_name) | |
| logger.info(f"Collection info: {info}") | |
| return info | |
| except Exception as e: | |
| logger.error(f"Error getting collection info: {e}") | |
| return None | |