chatbot / scripts /vector_rag_manager.py
Moderator404's picture
Upload 39 files
41bd0b8 verified
Raw
History Blame Contribute Delete
32.9 kB
"""
Vector RAG Manager for Too Many Cables
Implements true RAG with embeddings and vector database using ChromaDB
"""
import os
import json
import uuid
import hashlib
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
# Disable all external telemetry BEFORE importing chromadb to avoid PostHog issues
os.environ.setdefault('ANONYMIZED_TELEMETRY', 'False')
os.environ.setdefault('CHROMA_TELEMETRY_ENABLED', 'false')
os.environ.setdefault('CHROMADB_TELEMETRY_DISABLED', 'true')
os.environ.setdefault('POSTHOG_DISABLED', 'true')
# Set HuggingFace cache directories BEFORE importing transformers/sentence-transformers
os.environ['TRANSFORMERS_CACHE'] = '/app/data/transformers_cache'
os.environ['HF_HOME'] = '/app/data/huggingface_cache'
os.environ['HF_DATASETS_CACHE'] = '/app/data/huggingface_cache'
os.environ['SENTENCE_TRANSFORMERS_HOME'] = '/app/data/sentence_transformers_cache'
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
import numpy as np
from .knowledge_base_manager import KnowledgeBaseManager
class VectorRAGManager:
def __init__(self,
knowledge_base_path: str = "knowledge_base",
vector_db_path: str = "vector_db",
embedding_model: str = "all-MiniLM-L6-v2"):
"""Initialize Vector RAG Manager with ChromaDB and embeddings"""
self.kb_path = Path(knowledge_base_path)
self.vector_db_path = Path(vector_db_path)
self.embedding_model_name = embedding_model
# Create directories
self.kb_path.mkdir(exist_ok=True)
self.vector_db_path.mkdir(exist_ok=True)
# Initialize knowledge base manager
self.kb_manager = KnowledgeBaseManager(knowledge_base_path)
# Initialize ChromaDB client (explicitly disable telemetry)
self.chroma_client = chromadb.PersistentClient(
path=str(self.vector_db_path),
settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Initialize embedding model
logging.info(f"Loading embedding model: {embedding_model}")
self.embedding_model = SentenceTransformer(embedding_model)
logging.info(f"Embedding model loaded. Dimension: {self.embedding_model.get_sentence_embedding_dimension()}")
# Collection name for document chunks
self.collection_name = "tmc_documents"
# Document chunking parameters (token-based for better quality)
# Reduced from 300 to 200 tokens for more focused, relevant chunks
self.chunk_size_tokens = 200 # target tokens per chunk (smaller = more focused)
self.chunk_overlap_tokens = 30 # ~15% overlap (maintains context)
# Approximate token conversion (rough estimate: 1 token ≈ 4 characters)
self.chunk_size = self.chunk_size_tokens * 4 # ~800 chars
self.chunk_overlap = self.chunk_overlap_tokens * 4 # ~120 chars
# Initialize or get collection
self._init_collection()
def _init_collection(self):
"""Initialize or get the ChromaDB collection"""
try:
self.collection = self.chroma_client.get_collection(
name=self.collection_name
)
logging.info(f"Loaded existing collection '{self.collection_name}' with {self.collection.count()} documents")
except Exception:
# Collection doesn't exist, create it (handle older/newer API differences)
try:
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
metadata={"description": "Too Many Cables knowledge base embeddings"}
)
except Exception:
# Fallback without metadata
self.collection = self.chroma_client.create_collection(
name=self.collection_name
)
logging.info(f"Created new collection '{self.collection_name}'")
def estimate_tokens(self, text: str) -> int:
"""Rough token estimation: ~1 token per 4 characters for English text"""
return max(1, len(text) // 4)
def normalize_content(self, content: str) -> str:
"""
Clean and normalize content before chunking
Removes boilerplate, headers/footers, nav menus, TOCs, contact blocks
"""
import re
# Remove common boilerplate patterns
boilerplate_patterns = [
r'Copyright \d{4}.*?All rights reserved\.?',
r'© \d{4}.*?(?:\n|$)',
r'Contact us:.*?(?:\n\n|$)',
r'For more information.*?(?:\n\n|$)',
r'Visit our website.*?(?:\n\n|$)',
r'Call us at.*?(?:\n\n|$)',
r'Email:.*?(?:\n\n|$)',
r'Phone:.*?(?:\n\n|$)',
r'Address:.*?(?:\n\n|$)',
r'Table of Contents.*?(?:\n\n|\n(?=[A-Z]))',
r'TOC:.*?(?:\n\n|\n(?=[A-Z]))',
r'Navigation:.*?(?:\n\n|$)',
r'Home \| Products \| Support.*?(?:\n|$)',
r'Back to top.*?(?:\n|$)',
r'Print this page.*?(?:\n|$)',
r'Share:.*?(?:\n|$)',
r'Related articles:.*?(?:\n\n|$)',
r'See also:.*?(?:\n\n|$)',
r'Last updated:.*?(?:\n|$)',
r'Page \d+ of \d+.*?(?:\n|$)',
]
cleaned_content = content
for pattern in boilerplate_patterns:
cleaned_content = re.sub(pattern, '', cleaned_content, flags=re.IGNORECASE | re.DOTALL)
# Remove excessive whitespace and normalize line breaks
cleaned_content = re.sub(r'\n\s*\n\s*\n+', '\n\n', cleaned_content) # Max 2 consecutive newlines
cleaned_content = re.sub(r'[ \t]+', ' ', cleaned_content) # Normalize spaces
cleaned_content = re.sub(r'\n ', '\n', cleaned_content) # Remove leading spaces on lines
# Remove repetitive elements (like repeated contact info)
lines = cleaned_content.split('\n')
deduped_lines = []
seen_lines = set()
for line in lines:
line_clean = line.strip().lower()
# Skip very short lines or highly repetitive content
if len(line_clean) < 10:
deduped_lines.append(line)
continue
# Check for repetitive patterns
if line_clean not in seen_lines:
deduped_lines.append(line)
seen_lines.add(line_clean)
elif len(seen_lines) < 50: # Allow some repetition in small docs
deduped_lines.append(line)
return '\n'.join(deduped_lines).strip()
def chunk_text(self, text: str, chunk_size_tokens: Optional[int] = None,
chunk_overlap_tokens: Optional[int] = None) -> List[str]:
"""
Split text into overlapping chunks optimized for RAG retrieval
Uses token-based sizing (200-400 tokens) with 10-15% overlap
Enhanced with product manual structure awareness
"""
target_tokens = chunk_size_tokens or self.chunk_size_tokens
overlap_tokens = chunk_overlap_tokens or self.chunk_overlap_tokens
# Convert to character estimates
target_chars = target_tokens * 4
overlap_chars = overlap_tokens * 4
if self.estimate_tokens(text) <= target_tokens:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + target_chars
# Try to end at optimal boundaries (product sections > sentences > paragraphs > words)
if end < len(text):
# Priority 1: Product section boundaries (for product manuals)
product_boundaries = [
'\n### TMC-', # Product headers like ### TMC-HDMI-8K-10FT
'\n## Product ', # Section headers like ## Product Overview
'\n## Compatibility', # Other major sections
'\n## Installation',
'\n## Troubleshooting'
]
best_end = end
# Look for product boundaries within last 30% of chunk
search_range = int(target_chars * 0.3)
for i in range(max(end - search_range, start), end):
for boundary in product_boundaries:
if text[i:].startswith(boundary):
best_end = i
break
if best_end != end:
break
# Priority 2: Sentence boundaries within reasonable range
if best_end == end:
sentence_ends = ['. ', '! ', '? ', '.\n', '!\n', '?\n']
# Look for sentence endings within last 20% of chunk
search_range = int(target_chars * 0.2)
for i in range(max(end - search_range, start), end):
for sentence_end in sentence_ends:
if text[i:i+len(sentence_end)] == sentence_end:
best_end = i + len(sentence_end)
break
if best_end != end:
break
# Priority 3: Paragraph boundaries if no sentence found
if best_end == end:
search_range = int(target_chars * 0.2)
for i in range(max(end - search_range, start), end):
if text[i:i+2] == '\n\n':
best_end = i + 2
break
# Priority 4: Word boundaries as fallback
if best_end == end:
for i in range(end - 1, max(end - 50, start), -1):
if text[i] == ' ':
best_end = i + 1
break
end = best_end
chunk = text[start:end].strip()
if chunk and len(chunk) > 50: # Only include meaningful chunks
# Verify token count is reasonable
chunk_tokens = self.estimate_tokens(chunk)
if chunk_tokens >= 50: # Minimum chunk size
chunks.append(chunk)
# Move start position with overlap
start = end - overlap_chars
if start >= len(text):
break
return chunks
def generate_embedding(self, text: str) -> List[float]:
"""Generate embedding for a text using sentence transformer"""
embedding = self.embedding_model.encode(text, convert_to_tensor=False)
return embedding.tolist()
def index_documents(self, force_reindex: bool = False) -> Dict[str, int]:
"""
Index all documents in the knowledge base into ChromaDB
"""
logging.info("Starting document indexing...")
# Scan knowledge base
self.kb_manager.scan_documents()
# Check if we need to reindex
if not force_reindex:
current_count = self.collection.count()
total_docs = sum(len(cat['documents']) for cat in self.kb_manager.documents.values())
if current_count > 0:
logging.info(f"Collection already has {current_count} chunks. Use force_reindex=True to rebuild.")
return {"existing_chunks": current_count, "documents": total_docs}
# Clear existing collection if force reindex
if force_reindex and self.collection.count() > 0:
logging.info("Clearing existing collection for reindexing...")
self.chroma_client.delete_collection(self.collection_name)
self.collection = self.chroma_client.create_collection(
name=self.collection_name,
metadata={"description": "Too Many Cables knowledge base embeddings"}
)
# Process all documents
stats = {"documents": 0, "chunks": 0, "categories": {}}
for category, cat_info in self.kb_manager.documents.items():
stats["categories"][category] = 0
for doc_info in cat_info["documents"]:
logging.info(f"Processing: {doc_info['title']}")
# Load document content
content = self.kb_manager.load_document_content(doc_info['path'])
if not content:
continue
# Normalize and clean content before chunking
normalized_content = self.normalize_content(content)
if not normalized_content or len(normalized_content) < 100:
logging.warning(f"Skipping {doc_info['title']} - content too short after normalization")
continue
# Chunk the normalized document
chunks = self.chunk_text(normalized_content)
# Process each chunk
chunk_ids = []
chunk_texts = []
chunk_embeddings = []
chunk_metadatas = []
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_info['filename']}_{i}"
chunk_ids.append(chunk_id)
chunk_texts.append(chunk)
# Generate embedding
embedding = self.generate_embedding(chunk)
chunk_embeddings.append(embedding)
# Create metadata
metadata = {
"document_title": doc_info['title'],
"document_path": doc_info['path'],
"category": category,
"chunk_index": i,
"total_chunks": len(chunks),
"document_filename": doc_info['filename']
}
chunk_metadatas.append(metadata)
# Add chunks to collection
if chunk_ids:
self.collection.add(
ids=chunk_ids,
embeddings=chunk_embeddings,
documents=chunk_texts,
metadatas=chunk_metadatas
)
stats["chunks"] += len(chunk_ids)
stats["categories"][category] += len(chunk_ids)
stats["documents"] += 1
logging.info(f"Indexing complete! Processed {stats['documents']} documents into {stats['chunks']} chunks")
return stats
def calculate_relevance_score(self, query: str, chunk: str, metadata: Dict) -> float:
"""
Calculate enhanced relevance score using multiple signals
"""
query_lower = query.lower()
chunk_lower = chunk.lower()
score = 0.0
# Exact phrase matches (high value)
for word in query_lower.split():
if len(word) > 3: # Skip short words
if word in chunk_lower:
score += 0.3
# Bonus for multiple occurrences
score += 0.1 * (chunk_lower.count(word) - 1)
# Category relevance bonus
category = metadata.get('category', '').lower()
if 'policy' in query_lower and 'policies' in category:
score += 0.2
elif 'manual' in query_lower and 'manual' in category:
score += 0.2
elif 'faq' in query_lower and 'faq' in category:
score += 0.2
# Policy-specific query matching (warranty, return, shipping)
policy_types = {
'warranty': ['warranty', 'guarantee', 'defect', 'lifetime', 'coverage', 'claim'],
'return': ['return', 'refund', 'money-back', 'exchange'],
'shipping': ['shipping', 'delivery', 'freight', 'ship']
}
# Detect policy type in query
detected_policy = None
for policy_type, keywords in policy_types.items():
if any(kw in query_lower for kw in keywords):
detected_policy = policy_type
break
# Boost chunks that match the detected policy type
if detected_policy:
policy_keywords = policy_types[detected_policy]
matches = sum(1 for kw in policy_keywords if kw in chunk_lower)
score += 0.20 * matches # Increased from 0.15 to boost correct policy type more
# Penalize chunks about OTHER policy types (stronger penalty)
for other_type, other_keywords in policy_types.items():
if other_type != detected_policy:
# Count how many keywords from the WRONG policy appear
wrong_matches = sum(1 for kw in other_keywords[:3] if kw in chunk_lower)
if wrong_matches > 0:
score -= 0.25 * wrong_matches # Much stronger penalty
# Penalize boilerplate/footer content more aggressively
boilerplate_indicators = [
'contact our customer service',
'email:',
'phone:',
'mailing address',
'policy effective',
'subject to change',
'@toomanycables.com',
'live chat:',
'social media:',
'[corporate'
]
boilerplate_count = sum(1 for indicator in boilerplate_indicators if indicator in chunk_lower)
if boilerplate_count > 0:
score -= 0.25 * boilerplate_count # Stronger penalty, scales with amount of boilerplate
# Product type matching
product_terms = {
'usb-c': ['usb-c', 'usbc', 'type-c'],
'hdmi': ['hdmi', '4k', '8k', 'display'],
'lightning': ['lightning', 'iphone', 'apple'],
'usb-a': ['usb-a', 'usba', 'standard usb']
}
for product, variants in product_terms.items():
if any(variant in query_lower for variant in variants):
if any(variant in chunk_lower for variant in variants):
score += 0.25
# Length penalty for very long chunks (prefer concise answers)
if len(chunk) > 800:
score -= 0.1
# Chunk position bonus (earlier chunks often have key info)
chunk_index = metadata.get('chunk_index', 0)
if chunk_index == 0:
score += 0.1
elif chunk_index == 1:
score += 0.05
return score
def semantic_search(self, query: str, n_results: int = 5) -> List[Dict[str, Any]]:
"""
Perform semantic search using ChromaDB
"""
if self.collection.count() == 0:
logging.warning("Collection is empty. Run index_documents() first.")
return []
# Generate query embedding
query_embedding = self.generate_embedding(query)
# Search in ChromaDB
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=min(n_results, self.collection.count()),
include=["documents", "metadatas", "distances"]
)
# Format results
formatted_results = []
if results['documents'] and results['documents'][0]:
documents = results['documents'][0]
metadatas = results['metadatas'][0]
distances = results['distances'][0]
for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
# ChromaDB uses L2 (squared Euclidean) distance by default
# For normalized embeddings, we can convert L2 to cosine similarity:
# cosine_similarity = 1 - (L2_distance^2 / 2)
# But distances returned might already be squared, so let's use inverse distance as similarity
# Lower distance = higher similarity
# For L2 distance on normalized vectors: distance typically ranges 0-2
# Convert to similarity score where 0 distance = 1.0 similarity
if distance < 0.0001: # Perfect match
similarity = 1.0
else:
# For L2 distance on normalized vectors: similarity ≈ 1 - (distance²/2)
# But ChromaDB already returns squared distance, so: similarity = 1 - (distance/2)
similarity = max(0.0, 1.0 - (distance / 2.0))
formatted_results.append({
"document": doc,
"metadata": metadata,
"similarity": similarity,
"distance": distance, # Keep original distance for debugging
"rank": i + 1
})
return formatted_results
def retrieve_and_rerank(self, query: str, initial_k: int = 20, final_k: int = 5,
similarity_threshold: float = 0.30) -> List[Dict[str, Any]]:
"""
Retrieve top-k candidates and re-rank to final top results
This is the core optimization for better RAG quality
Args:
query: Search query
initial_k: Number of candidates to retrieve initially
final_k: Number of final results to return
similarity_threshold: Minimum similarity score (0.20 for L2 distance conversion)
"""
# Step 1: Retrieve top 20 candidates using vector similarity
candidates = self.semantic_search(query, n_results=initial_k)
if not candidates:
return []
# Filter by similarity threshold before reranking
candidates = [c for c in candidates if c['similarity'] >= similarity_threshold]
if not candidates:
logging.info(f"No candidates meet similarity threshold {similarity_threshold}")
return []
logging.debug(f"Filtered to {len(candidates)} candidates meeting threshold {similarity_threshold}")
# Step 2: Re-rank using enhanced relevance scoring
for candidate in candidates:
# Combine vector similarity with relevance scoring
relevance_score = self.calculate_relevance_score(
query,
candidate['document'],
candidate['metadata']
)
# Weighted combination: 70% vector similarity + 30% relevance features
candidate['final_score'] = (0.7 * candidate['similarity']) + (0.3 * relevance_score)
candidate['relevance_score'] = relevance_score
# Debug logging for top candidates
if candidate['similarity'] > 0.40:
doc_title = candidate['metadata'].get('document_title', 'Unknown')[:30]
content_preview = candidate['document'][:50].replace('\n', ' ')
logging.debug(f"Rerank: '{doc_title}' sim={candidate['similarity']:.3f}, rel={relevance_score:.3f}, final={candidate['final_score']:.3f}, preview='{content_preview}'")
# Step 3: Re-sort by final score and take top results
reranked = sorted(candidates, key=lambda x: x['final_score'], reverse=True)
# Step 4: Diversity filtering - avoid too many chunks from same document
final_results = []
seen_documents = set()
for result in reranked:
doc_title = result['metadata']['document_title']
# Allow max 2 chunks per document in final results
doc_count = sum(1 for r in final_results if r['metadata']['document_title'] == doc_title)
if doc_count < 2 or len(final_results) < final_k // 2:
final_results.append(result)
seen_documents.add(doc_title)
if len(final_results) >= final_k:
break
return final_results
def retrieve_and_rerank_filtered(self, query: str, target_categories: List[str],
initial_k: int = 20, final_k: int = 5,
similarity_threshold: float = 0.30) -> List[Dict[str, Any]]:
"""
Category-filtered retrieve and rerank for section routing
Args:
query: Search query
target_categories: Categories to filter by
initial_k: Number of candidates to retrieve initially
final_k: Number of final results to return
similarity_threshold: Minimum similarity score (0.20 for L2 distance conversion)
"""
if self.collection.count() == 0:
logging.warning("Collection is empty. Run index_documents() first.")
return []
# Generate query embedding
query_embedding = self.generate_embedding(query)
# Search with larger initial pool for filtering
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=min(initial_k * 2, self.collection.count()), # Get more for filtering
include=["documents", "metadatas", "distances"]
)
# Filter by target categories first
category_filtered = []
if results['documents'] and results['documents'][0]:
documents = results['documents'][0]
metadatas = results['metadatas'][0]
distances = results['distances'][0]
for doc, metadata, distance in zip(documents, metadatas, distances):
if metadata.get('category', '').lower() in [cat.lower() for cat in target_categories]:
# Use same similarity calculation as semantic_search
if distance < 0.0001:
similarity = 1.0
else:
similarity = max(0.0, 1.0 - (distance / 2.0))
# Apply similarity threshold to filter out low-similarity documents
if similarity >= similarity_threshold:
category_filtered.append({
"document": doc,
"metadata": metadata,
"similarity": similarity
})
else:
# Log filtered documents at debug level
doc_title = metadata.get('document_title', 'Unknown')[:40]
logging.debug(f"Filtered out '{doc_title}': similarity={similarity:.3f} < threshold {similarity_threshold}")
# If we don't have enough results from target categories, fall back to general search
if len(category_filtered) < final_k:
logging.info(f"Only {len(category_filtered)} results in target categories meet threshold, falling back to general search")
return self.retrieve_and_rerank(query, initial_k, final_k, similarity_threshold)
# Apply reranking to filtered results
for candidate in category_filtered[:initial_k]: # Limit to initial_k for reranking
relevance_score = self.calculate_relevance_score(
query,
candidate['document'],
candidate['metadata']
)
candidate['final_score'] = (0.7 * candidate['similarity']) + (0.3 * relevance_score)
candidate['relevance_score'] = relevance_score
# Sort by final score and apply diversity filtering
reranked = sorted(category_filtered[:initial_k], key=lambda x: x['final_score'], reverse=True)
# Log top results for debugging
if reranked:
logging.info(f"Top result: similarity={reranked[0]['similarity']:.3f}, final_score={reranked[0]['final_score']:.3f}")
# Diversity filtering
final_results = []
seen_documents = set()
for result in reranked:
doc_title = result['metadata']['document_title']
doc_count = sum(1 for r in final_results if r['metadata']['document_title'] == doc_title)
if doc_count < 2 or len(final_results) < final_k // 2:
final_results.append(result)
seen_documents.add(doc_title)
if len(final_results) >= final_k:
break
return final_results
def get_relevant_context(self, query: str, n_results: int = 5,
similarity_threshold: float = 0.20) -> str:
"""
Get relevant context for a query using semantic search
Args:
query: Search query
n_results: Maximum number of results to return
similarity_threshold: Minimum similarity score (0.20 for L2 distance conversion)
"""
# Perform semantic search
results = self.semantic_search(query, n_results)
# Filter by similarity threshold
relevant_results = [
r for r in results
if r["similarity"] >= similarity_threshold
]
if not relevant_results:
return ""
# Build context string
context_parts = []
total_length = 0
max_context_length = 2000
# Group chunks by document to avoid repetition
doc_chunks = {}
for result in relevant_results:
doc_title = result["metadata"]["document_title"]
if doc_title not in doc_chunks:
doc_chunks[doc_title] = []
doc_chunks[doc_title].append(result)
# Build context from best chunks per document
for doc_title, chunks in doc_chunks.items():
# Sort chunks by similarity
chunks.sort(key=lambda x: x["similarity"], reverse=True)
# Take best chunk from this document
best_chunk = chunks[0]
doc_header = f"\n--- {doc_title} (Category: {best_chunk['metadata']['category']}) ---\n"
chunk_content = best_chunk["document"]
combined_length = len(doc_header) + len(chunk_content)
if total_length + combined_length > max_context_length:
# Truncate to fit
remaining_space = max_context_length - total_length - len(doc_header)
if remaining_space > 100:
chunk_content = chunk_content[:remaining_space] + "..."
context_parts.append(doc_header + chunk_content)
break
context_parts.append(doc_header + chunk_content)
total_length += combined_length
return "".join(context_parts)
def get_collection_stats(self) -> Dict[str, Any]:
"""Get statistics about the vector database"""
try:
count = self.collection.count()
# Get sample of metadata to analyze categories
if count > 0:
sample_results = self.collection.get(limit=min(count, 100))
categories = {}
if sample_results["metadatas"]:
for metadata in sample_results["metadatas"]:
cat = metadata.get("category", "unknown")
categories[cat] = categories.get(cat, 0) + 1
return {
"total_chunks": count,
"categories": categories,
"embedding_dimension": self.embedding_model.get_sentence_embedding_dimension(),
"collection_name": self.collection_name
}
else:
return {
"total_chunks": 0,
"categories": {},
"embedding_dimension": self.embedding_model.get_sentence_embedding_dimension(),
"collection_name": self.collection_name
}
except Exception as e:
return {"error": str(e)}
# Production ready - no test code included