| """ |
| ingest.py — Load documents from a directory, chunk them, embed them, push to Pinecone. |
| |
| Usage: |
| python ingest.py --dir ./docs |
| python ingest.py --dir ./docs --chunk-size 400 --chunk-overlap 50 |
| """ |
|
|
| import os |
| import uuid |
| import argparse |
| import logging |
| from pathlib import Path |
|
|
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| from pinecone import Pinecone, ServerlessSpec |
| from embedder import embed_texts |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
| PINECONE_INDEX = os.getenv("PINECONE_INDEX", "llmops-rag") |
| EMBED_DIM = 384 |
|
|
|
|
| def chunk_text(text: str, chunk_size: int = 400, overlap: int = 50) -> list[str]: |
| """Naive character-level chunker. Replace with sentence splitter if needed.""" |
| words = text.split() |
| chunks, i = [], 0 |
| while i < len(words): |
| chunk = " ".join(words[i : i + chunk_size]) |
| chunks.append(chunk) |
| i += chunk_size - overlap |
| return chunks |
|
|
|
|
| def load_documents(directory: str) -> list[dict]: |
| """Load .txt and .md files recursively. Returns list of {source, text}.""" |
| docs = [] |
| for path in Path(directory).rglob("*"): |
| if path.suffix in {".txt", ".md"}: |
| text = path.read_text(encoding="utf-8", errors="ignore").strip() |
| if text: |
| docs.append({"source": str(path), "text": text}) |
| logger.info(f"Loaded {len(docs)} documents from {directory}") |
| return docs |
|
|
|
|
| def ensure_index(pc: Pinecone): |
| existing = [idx.name for idx in pc.list_indexes()] |
| if PINECONE_INDEX not in existing: |
| logger.info(f"Creating index '{PINECONE_INDEX}'...") |
| pc.create_index( |
| name=PINECONE_INDEX, |
| dimension=EMBED_DIM, |
| metric="cosine", |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), |
| ) |
| logger.info("Index created.") |
| else: |
| logger.info(f"Index '{PINECONE_INDEX}' already exists.") |
|
|
|
|
| def ingest_documents(directory: str, chunk_size: int = 400, chunk_overlap: int = 50) -> int: |
| if not PINECONE_API_KEY: |
| raise EnvironmentError("PINECONE_API_KEY not set") |
|
|
| pc = Pinecone(api_key=PINECONE_API_KEY) |
| ensure_index(pc) |
| index = pc.Index(PINECONE_INDEX) |
|
|
| docs = load_documents(directory) |
| if not docs: |
| logger.warning("No documents found. Nothing ingested.") |
| return 0 |
|
|
| all_chunks, all_meta = [], [] |
| for doc in docs: |
| for chunk in chunk_text(doc["text"], chunk_size, chunk_overlap): |
| all_chunks.append(chunk) |
| all_meta.append({"source": doc["source"], "text": chunk}) |
|
|
| logger.info(f"Embedding {len(all_chunks)} chunks...") |
| vectors = embed_texts(all_chunks) |
|
|
| |
| BATCH = 100 |
| total = 0 |
| for i in range(0, len(all_chunks), BATCH): |
| batch_vectors = [ |
| (str(uuid.uuid4()), vectors[j], all_meta[j]) |
| for j in range(i, min(i + BATCH, len(all_chunks))) |
| ] |
| index.upsert(vectors=batch_vectors) |
| total += len(batch_vectors) |
| logger.info(f" Upserted {total}/{len(all_chunks)}") |
|
|
| logger.info(f"Done. {total} vectors in Pinecone index '{PINECONE_INDEX}'.") |
| return total |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dir", default="./docs", help="Directory containing .txt/.md files") |
| parser.add_argument("--chunk-size", type=int, default=400) |
| parser.add_argument("--chunk-overlap", type=int, default=50) |
| args = parser.parse_args() |
| ingest_documents(args.dir, args.chunk_size, args.chunk_overlap) |