| import time |
| import re |
| import json |
| from pathlib import Path |
| from typing import Any, Dict, List |
| from pinecone import Pinecone, ServerlessSpec |
|
|
|
|
| |
| |
|
|
| def slugify_technique(name): |
| """Converts 'Sentence Splitter' to 'sentence-splitter' for Pinecone naming.""" |
| return re.sub(r'[^a-z0-9]+', '-', name.lower()).strip('-') |
|
|
| def get_index_by_name(api_key: str, index_name: str): |
| """ |
| Directly connects to a Pinecone index by its full string name. |
| Useful for the API/Production side where the name is already known. |
| """ |
| pc = Pinecone(api_key=api_key) |
| |
| |
| existing_indexes = [idx.name for idx in pc.list_indexes()] |
| if index_name not in existing_indexes: |
| raise ValueError(f"Index '{index_name}' does not exist in your Pinecone project.") |
| |
| print(f" Connecting to Index: {index_name}") |
| return pc.Index(index_name) |
|
|
| def get_pinecone_index(api_key, base_name, technique, dimension=384, metric="cosine"): |
| """ |
| Creates/Returns an index specifically for a technique. |
| Example: 'arxiv-index-token' |
| """ |
| pc = Pinecone(api_key=api_key) |
| tech_slug = slugify_technique(technique) |
| full_index_name = f"{base_name}-{tech_slug}" |
| |
| existing_indexes = [idx.name for idx in pc.list_indexes()] |
| |
| if full_index_name not in existing_indexes: |
| print(f" Creating specialized index: {full_index_name}...") |
| pc.create_index( |
| name=full_index_name, |
| dimension=dimension, |
| metric=metric, |
| spec=ServerlessSpec(cloud="aws", region="us-east-1") |
| ) |
| |
| while not pc.describe_index(full_index_name).status['ready']: |
| time.sleep(1) |
| |
| |
| return get_index_by_name(api_key, full_index_name) |
|
|
| def refresh_pinecone_index(index, final_chunks, batch_size=100): |
| """ |
| Refreshes the specific index. Since index is now technique-specific, |
| we just check if it's already populated. |
| """ |
| if not final_chunks: |
| print("No chunks provided to refresh.") |
| return False |
|
|
| try: |
| |
| stats = index.describe_index_stats() |
| current_count = stats.get('total_vector_count', 0) |
| expected_count = len(final_chunks) |
|
|
| print(f" Index Stats -> Existing: {current_count} | New Chunks: {expected_count}") |
|
|
| if current_count == 0: |
| print(f"➕ Index is empty. Upserting {expected_count} vectors...") |
| vectors = prepare_vectors_for_upsert(final_chunks) |
| upsert_to_pinecone(index, vectors, batch_size) |
| return True |
| |
| elif current_count < expected_count: |
| |
| print(f" Vector count mismatch ({current_count} < {expected_count}). Updating index...") |
| vectors = prepare_vectors_for_upsert(final_chunks) |
| upsert_to_pinecone(index, vectors, batch_size) |
| return True |
|
|
| else: |
| print(f" Index is already populated with {current_count} vectors. Ready for search.") |
| return False |
| |
| except Exception as e: |
| print(f" Error refreshing index: {e}") |
| return False |
|
|
| |
| def prepare_vectors_for_upsert(final_chunks): |
| vectors = [] |
| for chunk in final_chunks: |
| meta = chunk.get('metadata', {}) |
| vectors.append({ |
| 'id': chunk['id'], |
| 'values': chunk['values'], |
| 'metadata': { |
| 'text': meta.get('text', ""), |
| 'title': meta.get('title', ""), |
| 'url': meta.get('url', ""), |
| 'chunk_index': meta.get('chunk_index', 0), |
| 'technique': meta.get('technique', "unknown"), |
| 'chunking_technique': meta.get('chunking_technique', "unknown") |
| } |
| }) |
| return vectors |
|
|
| def upsert_to_pinecone(index, chunks, batch_size=100): |
| for i in range(0, len(chunks), batch_size): |
| batch = chunks[i : i + batch_size] |
| index.upsert(vectors=batch) |
|
|
| |
|
|
| def _sanitize_index_name(index_name: str) -> str: |
| return re.sub(r'[^a-zA-Z0-9._-]+', '-', index_name).strip('-') or 'default-index' |
|
|
|
|
| def _chunk_cache_path(cache_dir: str, index_name: str) -> Path: |
| cache_root = Path(cache_dir) |
| cache_root.mkdir(parents=True, exist_ok=True) |
| safe_name = _sanitize_index_name(index_name) |
| return cache_root / f"bm25_chunks_{safe_name}.json" |
|
|
|
|
| def _read_chunk_cache(path: Path) -> Dict[str, Any]: |
| with path.open("r", encoding="utf-8") as f: |
| return json.load(f) |
|
|
|
|
| def _write_chunk_cache(path: Path, payload: Dict[str, Any]) -> None: |
| with path.open("w", encoding="utf-8") as f: |
| json.dump(payload, f) |
|
|
|
|
| def load_chunks_with_local_cache( |
| index, |
| index_name: str, |
| cache_dir: str = ".cache", |
| batch_size: int = 100, |
| force_refresh: bool = False, |
| ) -> tuple[List[Dict[str, Any]], str]: |
|
|
| cache_file = _chunk_cache_path(cache_dir=cache_dir, index_name=index_name) |
| stats = index.describe_index_stats() |
| current_count = stats.get("total_vector_count", 0) |
|
|
| if not force_refresh and cache_file.exists(): |
| try: |
| cached_payload = _read_chunk_cache(cache_file) |
| cached_meta = cached_payload.get("meta", {}) |
| cached_count = cached_meta.get("vector_count", -1) |
| cached_chunks = cached_payload.get("chunks", []) |
|
|
| if cached_count == current_count and cached_chunks: |
| print( |
| f" Loaded BM25 chunk cache: {cache_file} " |
| f"(chunks={len(cached_chunks)}, vectors={cached_count})" |
| ) |
| return cached_chunks, "cache" |
|
|
| print( |
| " BM25 cache stale or empty. " |
| f"cache_vectors={cached_count}, pinecone_vectors={current_count}. Refreshing..." |
| ) |
| except Exception as e: |
| print(f" Failed to read BM25 cache ({cache_file}): {e}. Refreshing from Pinecone...") |
|
|
| chunks = load_chunks_from_pinecone(index=index, batch_size=batch_size) |
| payload = { |
| "meta": { |
| "index_name": index_name, |
| "vector_count": current_count, |
| "updated_at_epoch_s": int(time.time()), |
| }, |
| "chunks": chunks, |
| } |
|
|
| try: |
| _write_chunk_cache(cache_file, payload) |
| print(f" Saved BM25 chunk cache: {cache_file} (chunks={len(chunks)})") |
| except Exception as e: |
| print(f" Failed to write BM25 cache ({cache_file}): {e}") |
|
|
| return chunks, "pinecone" |
|
|
|
|
| def load_chunks_from_pinecone(index, batch_size: int = 100) -> list[dict[str, any]]: |
| """ |
| Scans the Pinecone index to retrieve all text metadata for the BM25 corpus. |
| """ |
| stats = index.describe_index_stats() |
| namespaces = list(stats.get('namespaces', {}).keys()) |
| |
| if not namespaces: |
| namespaces = [""] |
|
|
| all_chunks: List[Dict[str, Any]] = [] |
| seen_ids = set() |
|
|
| print(f"Loading vectors for BM25 from namespaces: {namespaces}") |
|
|
| for ns in namespaces: |
| |
| for id_batch in index.list(namespace=ns, limit=batch_size): |
| if not id_batch: |
| continue |
|
|
| |
| fetched = index.fetch(ids=id_batch, namespace=ns) |
| vectors = getattr(fetched, "vectors", {}) |
|
|
| for vector_id, vector_data in vectors.items(): |
| if vector_id in seen_ids: |
| continue |
| seen_ids.add(vector_id) |
|
|
| |
| metadata = getattr(vector_data, "metadata", {}) |
| text = metadata.get("text") |
| |
| if not text: |
| continue |
|
|
| all_chunks.append({ |
| "id": vector_id, |
| "metadata": { |
| "text": text, |
| "title": metadata.get("title", "Untitled"), |
| "url": metadata.get("url", ""), |
| "chunk_index": metadata.get("chunk_index", 0) |
| } |
| }) |
|
|
| print(f" Finished namespace: '{ns if ns else 'default'}'") |
|
|
| print(f"Total chunks loaded into memory: {len(all_chunks)}") |
| return all_chunks |