| | import json |
| | from copy import deepcopy |
| |
|
| | from dotenv import find_dotenv, load_dotenv |
| | from llama_index.core import StorageContext, VectorStoreIndex |
| | from llama_index.core.node_parser import SentenceSplitter |
| | from llama_index.core.schema import Document |
| | from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
| | from llama_index.vector_stores.milvus import MilvusVectorStore |
| | from llama_index.vector_stores.milvus.utils import BGEM3SparseEmbeddingFunction |
| |
|
| | from src.agent_hackathon.consts import PROJECT_ROOT_DIR |
| | from src.agent_hackathon.logger import get_logger |
| |
|
| | logger = get_logger(log_name="create_vector_db", log_dir=PROJECT_ROOT_DIR / "logs") |
| |
|
| |
|
| | class VectorDBCreator: |
| | """Handles creation of a Milvus vector database from arXiv data.""" |
| |
|
| | def __init__( |
| | self, |
| | data_path: str, |
| | db_uri: str, |
| | embedding_model: str = "Qwen/Qwen3-Embedding-0.6B", |
| | chunk_size: int = 20_000, |
| | chunk_overlap: int = 0, |
| | vector_dim: int = 1024, |
| | insert_batch_size: int = 8192, |
| | ) -> None: |
| | """ |
| | Initialize the VectorDBCreator. |
| | |
| | Args: |
| | data_path: Path to the JSON data file. |
| | db_uri: URI for the Milvus database. |
| | embedding_model: Name of the embedding model. |
| | chunk_size: Size of text chunks for splitting. |
| | chunk_overlap: Overlap between text chunks. |
| | vector_dim: Dimension of the embedding vectors. |
| | insert_batch_size: Batch size for insertion. |
| | """ |
| | self.data_path = data_path |
| | self.db_uri = db_uri |
| | self.embedding_model = embedding_model |
| | self.chunk_size = chunk_size |
| | self.chunk_overlap = chunk_overlap |
| | self.vector_dim = vector_dim |
| | self.insert_batch_size = insert_batch_size |
| | self.embed_model = HuggingFaceEmbedding( |
| | model_name=self.embedding_model, device="cpu" |
| | ) |
| | self.sent_splitter = SentenceSplitter( |
| | chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap |
| | ) |
| | logger.info("VectorDBCreator initialized.") |
| |
|
| | def load_data(self) -> list[dict]: |
| | """ |
| | Load and return data from the JSON file. |
| | |
| | Returns: |
| | List of dictionaries containing arXiv data. |
| | """ |
| | logger.info(f"Loading data from {self.data_path}") |
| | with open(file=self.data_path) as f: |
| | data = json.load(fp=f) |
| | logger.info("Data loaded successfully.") |
| | return deepcopy(x=data) |
| |
|
| | def prepare_documents(self, data: list[dict]) -> list[Document]: |
| | """ |
| | Convert raw data into a list of Document objects. |
| | |
| | Args: |
| | data: List of dictionaries with arXiv data. |
| | |
| | Returns: |
| | List of Document objects. |
| | """ |
| | logger.info("Preparing documents from data.") |
| | docs = [Document(text=d.pop("abstract"), metadata=d) for d in data] |
| | logger.info(f"Prepared {len(docs)} documents.") |
| | return docs |
| |
|
| | def create_vector_store(self) -> MilvusVectorStore: |
| | """ |
| | Create and return a MilvusVectorStore instance. |
| | |
| | Returns: |
| | Configured MilvusVectorStore. |
| | """ |
| | logger.info(f"Creating MilvusVectorStore at {self.db_uri}") |
| | store = MilvusVectorStore( |
| | uri=self.db_uri, |
| | dim=self.vector_dim, |
| | enable_sparse=True, |
| | sparse_embedding_function=BGEM3SparseEmbeddingFunction(), |
| | ) |
| | logger.info("MilvusVectorStore created.") |
| | return store |
| |
|
| | def build_index( |
| | self, docs_list: list[Document], vector_store: MilvusVectorStore |
| | ) -> VectorStoreIndex: |
| | """ |
| | Build and return a VectorStoreIndex from documents. |
| | |
| | Args: |
| | docs_list: List of Document objects. |
| | vector_store: MilvusVectorStore instance. |
| | |
| | Returns: |
| | VectorStoreIndex object. |
| | """ |
| | logger.info("Building VectorStoreIndex.") |
| | storage_context = StorageContext.from_defaults(vector_store=vector_store) |
| | index = VectorStoreIndex.from_documents( |
| | documents=docs_list, |
| | storage_context=storage_context, |
| | embed_model=self.embed_model, |
| | transformations=[self.sent_splitter], |
| | show_progress=True, |
| | insert_batch_size=self.insert_batch_size, |
| | ) |
| | logger.info("VectorStoreIndex built.") |
| | return index |
| |
|
| | def run(self) -> None: |
| | """ |
| | Execute the full pipeline: load data, prepare documents, create vector store, and build index. |
| | """ |
| | logger.info("Running full vector DB creation pipeline.") |
| | data = self.load_data() |
| | docs_list = self.prepare_documents(data=data) |
| | vector_store = self.create_vector_store() |
| | self.build_index(docs_list=docs_list, vector_store=vector_store) |
| | logger.info("Pipeline finished.") |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|