NLP-RAG / vector_db.py
Muddasri's picture
Merge branch 'main' into Muddasir/BackendComplete
1737e82 unverified
import time
import re
import json
from pathlib import Path
from typing import Any, Dict, List
from pinecone import Pinecone, ServerlessSpec
# Added cacheing to reduce consecutive startup time
# --@Qamar
def slugify_technique(name):
"""Converts 'Sentence Splitter' to 'sentence-splitter' for Pinecone naming."""
return re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-')
def get_index_by_name(api_key: str, index_name: str):
"""
Directly connects to a Pinecone index by its full string name.
Useful for the API/Production side where the name is already known.
"""
pc = Pinecone(api_key=api_key)
# Check if it exists first to avoid a 404 crash
existing_indexes = [idx.name for idx in pc.list_indexes()]
if index_name not in existing_indexes:
raise ValueError(f"Index '{index_name}' does not exist in your Pinecone project.")
print(f" Connecting to Index: {index_name}")
return pc.Index(index_name)
def get_pinecone_index(api_key, base_name, technique, dimension=384, metric="cosine"):
"""
Creates/Returns an index specifically for a technique.
Example: 'arxiv-index-token'
"""
pc = Pinecone(api_key=api_key)
tech_slug = slugify_technique(technique)
full_index_name = f"{base_name}-{tech_slug}"
existing_indexes = [idx.name for idx in pc.list_indexes()]
if full_index_name not in existing_indexes:
print(f" Creating specialized index: {full_index_name}...")
pc.create_index(
name=full_index_name,
dimension=dimension,
metric=metric,
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
# Wait for index to spin up
while not pc.describe_index(full_index_name).status['ready']:
time.sleep(1)
# Use our new helper to return the index object
return get_index_by_name(api_key, full_index_name)
def refresh_pinecone_index(index, final_chunks, batch_size=100):
"""
Refreshes the specific index. Since index is now technique-specific,
we just check if it's already populated.
"""
if not final_chunks:
print("No chunks provided to refresh.")
return False
try:
# Check current stats for this specific index
stats = index.describe_index_stats()
current_count = stats.get('total_vector_count', 0)
expected_count = len(final_chunks)
print(f" Index Stats -> Existing: {current_count} | New Chunks: {expected_count}")
if current_count == 0:
print(f"➕ Index is empty. Upserting {expected_count} vectors...")
vectors = prepare_vectors_for_upsert(final_chunks)
upsert_to_pinecone(index, vectors, batch_size)
return True
elif current_count < expected_count:
# Simple check to see if we need to top up or refresh
print(f" Vector count mismatch ({current_count} < {expected_count}). Updating index...")
vectors = prepare_vectors_for_upsert(final_chunks)
upsert_to_pinecone(index, vectors, batch_size)
return True
else:
print(f" Index is already populated with {current_count} vectors. Ready for search.")
return False
except Exception as e:
print(f" Error refreshing index: {e}")
return False
# Utility functions remain the same as previous version
def prepare_vectors_for_upsert(final_chunks):
vectors = []
for chunk in final_chunks:
meta = chunk.get('metadata', {})
vectors.append({
'id': chunk['id'],
'values': chunk['values'],
'metadata': {
'text': meta.get('text', ""),
'title': meta.get('title', ""),
'url': meta.get('url', ""),
'chunk_index': meta.get('chunk_index', 0),
'technique': meta.get('technique', "unknown"),
'chunking_technique': meta.get('chunking_technique', "unknown")
}
})
return vectors
def upsert_to_pinecone(index, chunks, batch_size=100):
for i in range(0, len(chunks), batch_size):
batch = chunks[i : i + batch_size]
index.upsert(vectors=batch)
# Some methods for loading chunks back from Pinecone with local caching to speed up BM25 initialization
def _sanitize_index_name(index_name: str) -> str:
return re.sub(r'[^a-zA-Z0-9._-]+', '-', index_name).strip('-') or 'default-index'
def _chunk_cache_path(cache_dir: str, index_name: str) -> Path:
cache_root = Path(cache_dir)
cache_root.mkdir(parents=True, exist_ok=True)
safe_name = _sanitize_index_name(index_name)
return cache_root / f"bm25_chunks_{safe_name}.json"
def _read_chunk_cache(path: Path) -> Dict[str, Any]:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _write_chunk_cache(path: Path, payload: Dict[str, Any]) -> None:
with path.open("w", encoding="utf-8") as f:
json.dump(payload, f)
def load_chunks_with_local_cache(
index,
index_name: str,
cache_dir: str = ".cache",
batch_size: int = 100,
force_refresh: bool = False,
) -> tuple[List[Dict[str, Any]], str]:
cache_file = _chunk_cache_path(cache_dir=cache_dir, index_name=index_name)
stats = index.describe_index_stats()
current_count = stats.get("total_vector_count", 0)
if not force_refresh and cache_file.exists():
try:
cached_payload = _read_chunk_cache(cache_file)
cached_meta = cached_payload.get("meta", {})
cached_count = cached_meta.get("vector_count", -1)
cached_chunks = cached_payload.get("chunks", [])
if cached_count == current_count and cached_chunks:
print(
f" Loaded BM25 chunk cache: {cache_file} "
f"(chunks={len(cached_chunks)}, vectors={cached_count})"
)
return cached_chunks, "cache"
print(
" BM25 cache stale or empty. "
f"cache_vectors={cached_count}, pinecone_vectors={current_count}. Refreshing..."
)
except Exception as e:
print(f" Failed to read BM25 cache ({cache_file}): {e}. Refreshing from Pinecone...")
chunks = load_chunks_from_pinecone(index=index, batch_size=batch_size)
payload = {
"meta": {
"index_name": index_name,
"vector_count": current_count,
"updated_at_epoch_s": int(time.time()),
},
"chunks": chunks,
}
try:
_write_chunk_cache(cache_file, payload)
print(f" Saved BM25 chunk cache: {cache_file} (chunks={len(chunks)})")
except Exception as e:
print(f" Failed to write BM25 cache ({cache_file}): {e}")
return chunks, "pinecone"
def load_chunks_from_pinecone(index, batch_size: int = 100) -> list[dict[str, any]]:
"""
Scans the Pinecone index to retrieve all text metadata for the BM25 corpus.
"""
stats = index.describe_index_stats()
namespaces = list(stats.get('namespaces', {}).keys())
# If no namespaces are explicitly named, Pinecone uses an empty string for the default
if not namespaces:
namespaces = [""]
all_chunks: List[Dict[str, Any]] = []
seen_ids = set()
print(f"Loading vectors for BM25 from namespaces: {namespaces}")
for ns in namespaces:
# Pinecone's list() generator returns batches of IDs
for id_batch in index.list(namespace=ns, limit=batch_size):
if not id_batch:
continue
# Fetch the actual content (metadata) for this batch of IDs
fetched = index.fetch(ids=id_batch, namespace=ns)
vectors = getattr(fetched, "vectors", {})
for vector_id, vector_data in vectors.items():
if vector_id in seen_ids:
continue
seen_ids.add(vector_id)
# Safely extract metadata
metadata = getattr(vector_data, "metadata", {})
text = metadata.get("text")
if not text:
continue
all_chunks.append({
"id": vector_id,
"metadata": {
"text": text,
"title": metadata.get("title", "Untitled"),
"url": metadata.get("url", ""),
"chunk_index": metadata.get("chunk_index", 0)
}
})
print(f" Finished namespace: '{ns if ns else 'default'}'")
print(f"Total chunks loaded into memory: {len(all_chunks)}")
return all_chunks