| """
|
| Deduplication: MinHash LSH for near-duplicate detection.
|
| """
|
|
|
| import hashlib
|
| import random
|
| from typing import List, Set, Tuple, Optional
|
| from dataclasses import dataclass
|
|
|
|
|
| @dataclass
|
| class MinHashSignature:
|
| """MinHash signature for a document."""
|
| hash_values: List[int]
|
| doc_id: str
|
|
|
|
|
| class MinHashLSH:
|
| """
|
| MinHash LSH for near-duplicate detection.
|
| Uses shingling and MinHash to estimate Jaccard similarity.
|
| """
|
|
|
| def __init__(
|
| self,
|
| num_permutations: int = 128,
|
| threshold: float = 0.8,
|
| bands: int = 16,
|
| rows_per_band: int = 8,
|
| ):
|
| """
|
| Initialize MinHash LSH.
|
|
|
| Args:
|
| num_permutations: Number of hash permutations for MinHash
|
| threshold: Similarity threshold for considering duplicates
|
| bands: Number of bands for LSH
|
| rows_per_band: Rows per band (bands * rows_per_band = num_permutations)
|
| """
|
| self.num_permutations = num_permutations
|
| self.threshold = threshold
|
| self.bands = bands
|
| self.rows_per_band = rows_per_band
|
|
|
| assert bands * rows_per_band == num_permutations
|
|
|
|
|
| self.hash_functions = self._generate_hash_functions(num_permutations)
|
|
|
|
|
| self.index = [dict() for _ in range(bands)]
|
|
|
|
|
| self.signatures = {}
|
|
|
| def _generate_hash_functions(self, n: int) -> List:
|
| """Generate n random hash functions."""
|
|
|
| functions = []
|
| for _ in range(n):
|
| a = random.randint(1, 2**32 - 1)
|
| b = random.randint(0, 2**32 - 1)
|
| functions.append((a, b))
|
| return functions
|
|
|
| def _hash(self, x: int, a: int, b: int) -> int:
|
| """Universal hash function: (a*x + b) mod prime."""
|
| prime = 2**61 - 1
|
| return ((a * x + b) % prime) & 0xFFFFFFFF
|
|
|
| def _compute_minhash(self, shingles: Set[int]) -> List[int]:
|
| """
|
| Compute MinHash signature for a set of shingles.
|
|
|
| Args:
|
| shingles: Set of shingle hash values
|
|
|
| Returns:
|
| List of minhash values (one per permutation)
|
| """
|
| signature = []
|
| for a, b in self.hash_functions:
|
| min_hash = min(self._hash(shingle, a, b) for shingle in shingles)
|
| signature.append(min_hash)
|
| return signature
|
|
|
| def _shingle_text(
|
| self,
|
| text: str,
|
| k: int = 5,
|
| ) -> Set[int]:
|
| """
|
| Extract k-gram shingles from text.
|
|
|
| Args:
|
| text: Input text
|
| k: Shingle size (characters)
|
|
|
| Returns:
|
| Set of shingle hashes
|
| """
|
| text = text.lower()
|
| shingles = set()
|
| for i in range(len(text) - k + 1):
|
| shingle = text[i:i+k]
|
|
|
| shingle_hash = int(hashlib.md5(shingle.encode()).hexdigest()[:8], 16)
|
| shingles.add(shingle_hash)
|
| return shingles
|
|
|
| def add_document(
|
| self,
|
| doc_id: str,
|
| text: str,
|
| compute_signature: bool = True,
|
| ) -> MinHashSignature:
|
| """
|
| Add a document to the LSH index.
|
|
|
| Args:
|
| doc_id: Unique document ID
|
| text: Document text
|
| compute_signature: Whether to compute signature (or use precomputed)
|
|
|
| Returns:
|
| MinHash signature
|
| """
|
| if compute_signature:
|
| shingles = self._shingle_text(text)
|
| signature = self._compute_minhash(shingles)
|
| else:
|
| raise ValueError("Must compute signature")
|
|
|
|
|
| signature_obj = MinHashSignature(hash_values=signature, doc_id=doc_id)
|
| self.signatures[doc_id] = signature_obj
|
|
|
|
|
| for band_idx in range(self.bands):
|
| start = band_idx * self.rows_per_band
|
| end = start + self.rows_per_band
|
| band_signature = tuple(signature[start:end])
|
| bucket_hash = hash(band_signature)
|
|
|
| if bucket_hash not in self.index[band_idx]:
|
| self.index[band_idx][bucket_hash] = set()
|
| self.index[band_idx][bucket_hash].add(doc_id)
|
|
|
| return signature_obj
|
|
|
| def query(
|
| self,
|
| text: str,
|
| candidate_doc_ids: Optional[Set[str]] = None,
|
| ) -> List[Tuple[str, float]]:
|
| """
|
| Query for near-duplicate documents.
|
|
|
| Args:
|
| text: Query text
|
| candidate_doc_ids: Optional set of candidate doc IDs to check
|
|
|
| Returns:
|
| List of (doc_id, similarity) above threshold
|
| """
|
| shingles = self._shingle_text(text)
|
| query_signature = self._compute_minhash(shingles)
|
|
|
|
|
| candidate_sets = []
|
| for band_idx in range(self.bands):
|
| start = band_idx * self.rows_per_band
|
| end = start + self.rows_per_band
|
| band_signature = tuple(query_signature[start:end])
|
| bucket_hash = hash(band_signature)
|
|
|
| if bucket_hash in self.index[band_idx]:
|
| candidate_sets.append(self.index[band_idx][bucket_hash])
|
|
|
|
|
| candidates = set()
|
| for s in candidate_sets:
|
| candidates.update(s)
|
|
|
| if candidate_doc_ids is not None:
|
| candidates = candidates.intersection(candidate_doc_ids)
|
|
|
|
|
| results = []
|
| query_shingles = shingles
|
| for doc_id in candidates:
|
|
|
|
|
| similarity = self._estimate_similarity(query_signature, doc_id)
|
| if similarity >= self.threshold:
|
| results.append((doc_id, similarity))
|
|
|
| return sorted(results, key=lambda x: x[1], reverse=True)
|
|
|
| def _estimate_similarity(
|
| self,
|
| signature1: List[int],
|
| doc_id2: str,
|
| ) -> float:
|
| """
|
| Estimate Jaccard similarity between two signatures.
|
| Uses MinHash similarity: proportion of matching hash values.
|
|
|
| Args:
|
| signature1: First MinHash signature
|
| doc_id2: Second document ID (signature retrieved from storage)
|
|
|
| Returns:
|
| Estimated Jaccard similarity
|
| """
|
| if doc_id2 not in self.signatures:
|
| return 0.0
|
|
|
| signature2 = self.signatures[doc_id2].hash_values
|
|
|
|
|
| matches = sum(1 for h1, h2 in zip(signature1, signature2) if h1 == h2)
|
| similarity = matches / len(signature1)
|
|
|
| return similarity
|
|
|
| def compute_signature(self, text: str) -> MinHashSignature:
|
| """Compute MinHash signature for text."""
|
| shingles = self._shingle_text(text)
|
| signature = self._compute_minhash(shingles)
|
| return MinHashSignature(hash_values=signature, doc_id="")
|
|
|
|
|
| def test_deduplication():
|
| """Test MinHash LSH."""
|
| lsh = MinHashLSH(num_permutations=64, threshold=0.7, bands=8, rows_per_band=8)
|
|
|
|
|
| docs = [
|
| ("doc1", "The quick brown fox jumps over the lazy dog."),
|
| ("doc2", "The quick brown fox jumps over the lazy dog!!!"),
|
| ("doc3", "The quick brown fox leaps over the lazy dog."),
|
| ("doc4", "Completely unrelated text about science and experiments."),
|
| ]
|
|
|
| signatures = {}
|
| for doc_id, text in docs:
|
| sig = lsh.add_document(doc_id, text)
|
| signatures[doc_id] = sig
|
|
|
|
|
| results = lsh.query(docs[0][1])
|
| print(f"Query results for doc1: {results}")
|
|
|
|
|
| print("Deduplication test passed!")
|
|
|
|
|
| if __name__ == "__main__":
|
| test_deduplication()
|
|
|