Spaces:
Sleeping
Sleeping
"""Deduplication module for fabric-to-espanso.""" | |
import logging | |
from typing import List, Dict, Any, Tuple, Set | |
import difflib | |
from qdrant_client import QdrantClient | |
from qdrant_client.http.models import Filter, PointIdsList | |
from .config import config | |
from .database import get_dense_vector_name, get_sparse_vector_name | |
logger = logging.getLogger('fabric_to_espanso') | |
def calculate_text_difference_percentage(text1: str, text2: str) -> float: | |
""" | |
Calculate the percentage difference between two text strings. | |
Args: | |
text1: First text string | |
text2: Second text string | |
Returns: | |
Percentage difference as a float between 0.0 (identical) and 1.0 (completely different) | |
""" | |
# Use difflib's SequenceMatcher to calculate similarity ratio | |
similarity = difflib.SequenceMatcher(None, text1, text2).ratio() | |
# Convert similarity to difference percentage | |
difference_percentage = 1.0 - similarity | |
return difference_percentage | |
# TODO: Consider moving the vector similarity search functionality to database_query.py and import it here | |
# This would create a more structured codebase with search functionality centralized in one place | |
def find_duplicates(client: QdrantClient, collection_name: str = config.embedding.collection_name) -> List[Tuple[str, List[str]]]: | |
""" | |
Find duplicate entries in the database based on semantic similarity and text difference. | |
Args: | |
client: Initialized Qdrant client | |
collection_name: Name of the collection to query | |
Returns: | |
List of tuples containing (kept_point_id, [duplicate_point_ids]) | |
""" | |
# Constants for duplicate detection | |
SIMILARITY_THRESHOLD = 0.85 # Minimum semantic similarity to consider as potential duplicate | |
DIFFERENCE_THRESHOLD = 0.1 # Maximum text difference (10%) to consider as duplicate | |
# Get all points from the database | |
all_points = client.scroll( | |
collection_name=collection_name, | |
with_vectors=True, # Include vector data, else no vector will be available | |
limit=10000 # Adjust based on expected file count | |
)[0] | |
logger.info(f"Checking {len(all_points)} entries for duplicates") | |
# Track processed points to avoid redundant comparisons | |
processed_points = set() | |
# Store duplicates as (kept_id, [duplicate_ids]) | |
duplicates = [] | |
# For each point, find semantically similar points | |
for i, point in enumerate(all_points): | |
if point.id in processed_points: | |
continue | |
point_id = point.id | |
point_content = point.payload.get('content', '') | |
logger.debug(f"Checking point {point_id} for duplicates") | |
logger.debug(f"Content: {point_content}") | |
# Skip if no content | |
if not point_content: | |
logger.debug(f"Skipping point {point_id} as it has no content") | |
continue | |
# Get the actual vector names from the collection configuration | |
dense_vector_name = get_dense_vector_name(client, collection_name) | |
# Skip points without vector or without the required vector type | |
if not point.vector or dense_vector_name not in point.vector: | |
logger.debug(f"Skipping point {point_id} as it has no valid vector") | |
continue | |
# Find semantically similar points using Qdrant's search | |
similar_points = client.search( | |
collection_name=collection_name, | |
query_vector=(dense_vector_name, point.vector.get(dense_vector_name)), | |
limit=100, | |
score_threshold=SIMILARITY_THRESHOLD # Only consider points with similarity > threshold | |
) | |
# Skip the first result (which is the point itself) | |
similar_points = [p for p in similar_points if p.id != point_id] | |
if not similar_points: | |
continue | |
logger.debug(f"Found {len(similar_points)} semantically similar points for {point.payload.get('filename', 'unknown')}") | |
# Check text difference for each similar point | |
duplicate_ids = [] | |
for similar_point in similar_points: | |
similar_id = similar_point.id | |
# Skip if already processed | |
if similar_id in processed_points: | |
continue | |
# Get content of similar point | |
similar_content = None | |
for p in all_points: | |
if p.id == similar_id: | |
similar_content = p.payload.get('content', '') | |
break | |
if not similar_content: | |
continue | |
# Calculate text difference percentage | |
diff_percentage = calculate_text_difference_percentage(point_content, similar_content) | |
# If difference is less than threshold, consider it a duplicate | |
if diff_percentage <= DIFFERENCE_THRESHOLD: | |
duplicate_ids.append(similar_id) | |
processed_points.add(similar_id) | |
logger.debug(f"Found duplicate: {similar_id} (diff: {diff_percentage:.2%})") | |
if duplicate_ids: | |
duplicates.append((point_id, duplicate_ids)) | |
processed_points.add(point_id) | |
logger.info(f"Found {sum(len(dups) for _, dups in duplicates)} duplicate entries in {len(duplicates)} groups") | |
return duplicates | |
def remove_duplicates(client: QdrantClient, collection_name: str = config.embedding.collection_name) -> int: | |
""" | |
Remove duplicate entries from the database based on semantic similarity and text difference. | |
Uses a two-step verification process: | |
1. Find entries with semantic similarity > 0.9 (using vector search) | |
2. For those entries, keep only those with text difference <= 5% | |
Args: | |
client: Initialized Qdrant client | |
collection_name: Name of the collection to query | |
Returns: | |
Number of removed duplicate entries | |
""" | |
# Find duplicates | |
duplicate_groups = find_duplicates(client, collection_name) | |
if not duplicate_groups: | |
logger.info("No duplicates found") | |
return 0 | |
# Count total duplicates | |
total_duplicates = sum(len(dups) for _, dups in duplicate_groups) | |
# Remove duplicates | |
for _, duplicate_ids in duplicate_groups: | |
if duplicate_ids: | |
client.delete( | |
collection_name=collection_name, | |
points_selector=PointIdsList(points=duplicate_ids) | |
) | |
logger.info(f"Removed {total_duplicates} duplicate entries from the database") | |
return total_duplicates | |