telegram-analytics / vector_search.py
rottg's picture
Upload folder using huggingface_hub
4a21e7e
#!/usr/bin/env python3
"""
Vector Search Module for Semantic Similarity
Optional module that adds semantic search capabilities using:
- Sentence embeddings (sentence-transformers)
- FAISS for efficient similarity search
Dependencies (optional, install with):
pip install sentence-transformers faiss-cpu numpy
If dependencies are not installed, the module gracefully degrades.
"""
import sqlite3
import pickle
from pathlib import Path
from typing import Optional
# Try importing optional dependencies
VECTOR_SEARCH_AVAILABLE = False
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
np = None
try:
import faiss
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
faiss = None
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
SentenceTransformer = None
VECTOR_SEARCH_AVAILABLE = all([NUMPY_AVAILABLE, FAISS_AVAILABLE, SENTENCE_TRANSFORMERS_AVAILABLE])
class VectorSearchUnavailable:
"""Placeholder when dependencies are not installed."""
def __init__(self, *args, **kwargs):
pass
def __getattr__(self, name):
def method(*args, **kwargs):
raise RuntimeError(
"Vector search requires additional dependencies. Install with:\n"
"pip install sentence-transformers faiss-cpu numpy"
)
return method
class VectorSearch:
"""
Semantic search using sentence embeddings and FAISS.
Features:
- Generate embeddings for messages
- Build FAISS index for fast similarity search
- Find semantically similar messages (not just keyword match)
- Supports Hebrew and multilingual text
Example:
vs = VectorSearch(db_path='telegram.db')
vs.build_index() # One-time, can take a while
# Find similar messages
results = vs.search("ืžื” ืงื•ืจื” ื”ื™ื•ื?", limit=10)
for msg_id, score, text in results:
print(f"{score:.3f}: {text[:50]}")
"""
# Recommended models for multilingual/Hebrew support
MODELS = {
'fast': 'paraphrase-multilingual-MiniLM-L12-v2', # Fast, good multilingual
'accurate': 'paraphrase-multilingual-mpnet-base-v2', # More accurate
'small': 'all-MiniLM-L6-v2', # Smallest, English-focused
}
def __init__(
self,
db_path: str = 'telegram.db',
model_name: str = 'fast',
index_path: Optional[str] = None
):
"""
Initialize vector search.
Args:
db_path: Path to SQLite database
model_name: Model preset ('fast', 'accurate', 'small') or full model name
index_path: Path to save/load FAISS index (default: db_path + '.faiss')
"""
if not VECTOR_SEARCH_AVAILABLE:
raise RuntimeError(
"Vector search requires additional dependencies. Install with:\n"
"pip install sentence-transformers faiss-cpu numpy"
)
self.db_path = db_path
self.index_path = index_path or f"{db_path}.faiss"
self.id_map_path = f"{self.index_path}.ids"
# Load model
model_id = self.MODELS.get(model_name, model_name)
print(f"Loading embedding model: {model_id}")
self.model = SentenceTransformer(model_id)
self.dimension = self.model.get_sentence_embedding_dimension()
# Initialize FAISS index
self.index = None
self.id_map: list[int] = [] # Maps FAISS index position to message_id
# Try to load existing index
if Path(self.index_path).exists():
self.load_index()
def _get_connection(self) -> sqlite3.Connection:
"""Get database connection."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def encode(self, texts: list[str], batch_size: int = 32, show_progress: bool = True) -> 'np.ndarray':
"""
Encode texts to embeddings.
Args:
texts: List of texts to encode
batch_size: Batch size for encoding
show_progress: Show progress bar
Returns:
numpy array of shape (n_texts, dimension)
"""
return self.model.encode(
texts,
batch_size=batch_size,
show_progress_bar=show_progress,
convert_to_numpy=True,
normalize_embeddings=True # For cosine similarity
)
def build_index(
self,
batch_size: int = 1000,
min_text_length: int = 10,
use_gpu: bool = False
) -> None:
"""
Build FAISS index from all messages in database.
Args:
batch_size: Number of messages to process at once
min_text_length: Minimum text length to index
use_gpu: Use GPU acceleration if available
"""
conn = self._get_connection()
# Count messages
cursor = conn.execute(
'SELECT COUNT(*) FROM messages WHERE length(text_plain) >= ?',
(min_text_length,)
)
total = cursor.fetchone()[0]
print(f"Building index for {total} messages...")
# Create FAISS index
# Using IndexFlatIP (Inner Product) since we normalize embeddings
self.index = faiss.IndexFlatIP(self.dimension)
if use_gpu and faiss.get_num_gpus() > 0:
print("Using GPU acceleration")
self.index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(),
0,
self.index
)
self.id_map = []
# Process in batches
offset = 0
while offset < total:
cursor = conn.execute(
'''
SELECT id, text_plain FROM messages
WHERE length(text_plain) >= ?
ORDER BY id
LIMIT ? OFFSET ?
''',
(min_text_length, batch_size, offset)
)
rows = cursor.fetchall()
if not rows:
break
ids = [row['id'] for row in rows]
texts = [row['text_plain'] for row in rows]
# Encode batch
embeddings = self.encode(texts, show_progress=False)
# Add to index
self.index.add(embeddings)
self.id_map.extend(ids)
offset += len(rows)
print(f"Indexed {offset}/{total} messages ({100*offset/total:.1f}%)")
conn.close()
# Save index
self.save_index()
print(f"Index built: {self.index.ntotal} vectors")
def save_index(self) -> None:
"""Save FAISS index and ID map to disk."""
if self.index is None:
return
# Convert GPU index to CPU for saving
if hasattr(faiss, 'index_gpu_to_cpu'):
try:
cpu_index = faiss.index_gpu_to_cpu(self.index)
except:
cpu_index = self.index
else:
cpu_index = self.index
faiss.write_index(cpu_index, self.index_path)
with open(self.id_map_path, 'wb') as f:
pickle.dump(self.id_map, f)
print(f"Index saved to {self.index_path}")
def load_index(self) -> bool:
"""Load FAISS index from disk."""
try:
self.index = faiss.read_index(self.index_path)
with open(self.id_map_path, 'rb') as f:
self.id_map = pickle.load(f)
print(f"Loaded index with {self.index.ntotal} vectors")
return True
except Exception as e:
print(f"Could not load index: {e}")
return False
def search(
self,
query: str,
limit: int = 10,
min_score: float = 0.0
) -> list[tuple[int, float, str]]:
"""
Search for semantically similar messages.
Args:
query: Search query text
limit: Maximum results to return
min_score: Minimum similarity score (0-1)
Returns:
List of (message_id, score, text) tuples
"""
if self.index is None or self.index.ntotal == 0:
raise RuntimeError("Index not built. Call build_index() first.")
# Encode query
query_vector = self.encode([query], show_progress=False)
# Search FAISS
scores, indices = self.index.search(query_vector, limit)
# Get message texts from DB
conn = self._get_connection()
results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1 or score < min_score:
continue
message_id = self.id_map[idx]
cursor = conn.execute(
'SELECT text_plain FROM messages WHERE id = ?',
(message_id,)
)
row = cursor.fetchone()
if row:
results.append((message_id, float(score), row['text_plain']))
conn.close()
return results
def find_similar(
self,
message_id: int,
limit: int = 10,
exclude_same_user: bool = False
) -> list[tuple[int, float, str]]:
"""
Find messages similar to a specific message.
Args:
message_id: ID of the reference message
limit: Maximum results to return
exclude_same_user: Exclude messages from same user
Returns:
List of (message_id, score, text) tuples
"""
conn = self._get_connection()
# Get the reference message
cursor = conn.execute(
'SELECT text_plain, from_id FROM messages WHERE id = ?',
(message_id,)
)
row = cursor.fetchone()
if not row:
conn.close()
return []
reference_text = row['text_plain']
reference_user = row['from_id']
conn.close()
# Search
results = self.search(reference_text, limit=limit * 2)
# Filter
filtered = []
for msg_id, score, text in results:
if msg_id == message_id:
continue
if exclude_same_user:
conn = self._get_connection()
cursor = conn.execute(
'SELECT from_id FROM messages WHERE id = ?',
(msg_id,)
)
msg_row = cursor.fetchone()
conn.close()
if msg_row and msg_row['from_id'] == reference_user:
continue
filtered.append((msg_id, score, text))
if len(filtered) >= limit:
break
return filtered
def cluster_messages(
self,
n_clusters: int = 10,
sample_size: Optional[int] = None
) -> dict[int, list[int]]:
"""
Cluster messages by semantic similarity using K-means.
Args:
n_clusters: Number of clusters
sample_size: Number of messages to sample (None = all)
Returns:
Dict mapping cluster_id to list of message_ids
"""
if self.index is None or self.index.ntotal == 0:
raise RuntimeError("Index not built. Call build_index() first.")
# Get vectors
n_vectors = self.index.ntotal
if sample_size and sample_size < n_vectors:
indices = np.random.choice(n_vectors, sample_size, replace=False)
vectors = np.array([self.index.reconstruct(int(i)) for i in indices])
ids = [self.id_map[i] for i in indices]
else:
vectors = np.array([self.index.reconstruct(i) for i in range(n_vectors)])
ids = self.id_map
# K-means clustering
kmeans = faiss.Kmeans(self.dimension, n_clusters, niter=20, verbose=True)
kmeans.train(vectors)
# Assign clusters
_, assignments = kmeans.index.search(vectors, 1)
# Group by cluster
clusters: dict[int, list[int]] = {i: [] for i in range(n_clusters)}
for msg_id, cluster_id in zip(ids, assignments.flatten()):
clusters[int(cluster_id)].append(msg_id)
return clusters
@property
def stats(self) -> dict:
"""Get index statistics."""
return {
'available': VECTOR_SEARCH_AVAILABLE,
'model': self.model.get_sentence_embedding_dimension() if self.model else None,
'dimension': self.dimension,
'index_size': self.index.ntotal if self.index else 0,
'index_path': self.index_path
}
# Export appropriate class based on availability
if VECTOR_SEARCH_AVAILABLE:
SemanticSearch = VectorSearch
else:
SemanticSearch = VectorSearchUnavailable
def check_dependencies() -> dict:
"""Check which dependencies are available."""
return {
'numpy': NUMPY_AVAILABLE,
'faiss': FAISS_AVAILABLE,
'sentence_transformers': SENTENCE_TRANSFORMERS_AVAILABLE,
'vector_search_available': VECTOR_SEARCH_AVAILABLE
}
if __name__ == '__main__':
print("=== Vector Search Dependencies ===")
deps = check_dependencies()
for name, available in deps.items():
status = "โœ“" if available else "โœ—"
print(f" {status} {name}")
if VECTOR_SEARCH_AVAILABLE:
print("\nVector search is available!")
print("Usage:")
print(" vs = VectorSearch('telegram.db')")
print(" vs.build_index() # One-time indexing")
print(" results = vs.search('ืžื” ืงื•ืจื”?')")
else:
print("\nTo enable vector search, install dependencies:")
print(" pip install sentence-transformers faiss-cpu numpy")