ikram98ai commited on
Commit
0c20b58
·
1 Parent(s): bce3364

refactoring get_vectorstore

Browse files
src/app.py CHANGED
@@ -12,7 +12,7 @@ if str(_project_root) not in sys.path:
12
 
13
  from src.core.ingest import load_documents, get_chunks, ingest_documents
14
  from src.core.retrieval import generate, retrieval
15
- from src.core.index import MetaData
16
  from src.core.synthetic_data import EVAL_QUERIES, SYNTHETIC_DOCUMENTS
17
  from src.core.eval import run_full_evaluation, save_results
18
  from src.core.eval import generate_summary_report, setup_test_data
@@ -49,9 +49,10 @@ def ingest_files(files:List[str], index_name:str, lang:Literal["en", "ja"], doma
49
  language=lang, domain=domain, section=section, topic=topic, doc_type=doc_type
50
  )
51
  try:
 
52
  docs = load_documents(files)
53
  chunks = get_chunks(docs, filter_data)
54
- message = ingest_documents(chunks, index_name)
55
  except Exception as e:
56
  message = f"Error during ingestion: {str(e)}"
57
  print(message)
@@ -77,8 +78,8 @@ def _rag_query(
77
  print(f"Active Filters: {active_filters.model_dump()}")
78
 
79
  ret_start_time = time.time()
80
-
81
- docs = retrieval(question, index_name, active_filters)
82
  retrieval_results = [doc.page_content + _add_metric(doc) for doc in docs]
83
  snippets_md = "\n\n---\n\n".join(retrieval_results)
84
 
 
12
 
13
  from src.core.ingest import load_documents, get_chunks, ingest_documents
14
  from src.core.retrieval import generate, retrieval
15
+ from src.core.index import MetaData, get_vectorstore
16
  from src.core.synthetic_data import EVAL_QUERIES, SYNTHETIC_DOCUMENTS
17
  from src.core.eval import run_full_evaluation, save_results
18
  from src.core.eval import generate_summary_report, setup_test_data
 
49
  language=lang, domain=domain, section=section, topic=topic, doc_type=doc_type
50
  )
51
  try:
52
+ vectorstore = get_vectorstore(index_name)
53
  docs = load_documents(files)
54
  chunks = get_chunks(docs, filter_data)
55
+ message = ingest_documents(chunks, vectorstore)
56
  except Exception as e:
57
  message = f"Error during ingestion: {str(e)}"
58
  print(message)
 
78
  print(f"Active Filters: {active_filters.model_dump()}")
79
 
80
  ret_start_time = time.time()
81
+ vectorstore = get_vectorstore(index_name)
82
+ docs = retrieval(question, active_filters, vectorstore)
83
  retrieval_results = [doc.page_content + _add_metric(doc) for doc in docs]
84
  snippets_md = "\n\n---\n\n".join(retrieval_results)
85
 
src/core/eval.py CHANGED
@@ -17,7 +17,7 @@ import numpy as np
17
  from langchain_core.documents import Document
18
  from langchain_openai import OpenAIEmbeddings
19
  from dotenv import load_dotenv, find_dotenv
20
- from .index import MetaData
21
  from .retrieval import retrieval, generate
22
  from .ingest import ingest_documents, get_chunks
23
  from .synthetic_data import SYNTHETIC_DOCUMENTS, EVAL_QUERIES, EvalQuery
@@ -28,7 +28,6 @@ load_dotenv()
28
  # Embedding model for semantic similarity
29
  emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536)
30
 
31
-
32
  @dataclass
33
  class EvalResult:
34
  """Evaluation result for a single query"""
@@ -131,7 +130,8 @@ def evaluate_single_query(
131
 
132
  # Retrieval
133
  ret_start = time.time()
134
- docs = retrieval(eval_query.query, eval_query.collection, filters)
 
135
  ret_end = time.time()
136
  ret_latency = (ret_end - ret_start) * 1000 # Convert to ms
137
 
@@ -443,11 +443,9 @@ def setup_test_data(collections: List[str] = None):
443
  metadata = MetaData(**metadata)
444
  chunks = get_chunks([doc], metadata)
445
  documents.extend(chunks)
446
-
447
- # vectorstore = get_vectorstore(collection_name)
448
- # ids = [str(uuid.uuid4()) for _ in range(len(documents))]
449
- # vectorstore.add_documents(documents, ids=ids)
450
- ingest_documents(documents, collection_name)
451
  tot_docs += len(docs)
452
  print(f"✓ Completed '{collection_name}' collection")
453
 
 
17
  from langchain_core.documents import Document
18
  from langchain_openai import OpenAIEmbeddings
19
  from dotenv import load_dotenv, find_dotenv
20
+ from .index import MetaData, get_vectorstore
21
  from .retrieval import retrieval, generate
22
  from .ingest import ingest_documents, get_chunks
23
  from .synthetic_data import SYNTHETIC_DOCUMENTS, EVAL_QUERIES, EvalQuery
 
28
  # Embedding model for semantic similarity
29
  emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536)
30
 
 
31
  @dataclass
32
  class EvalResult:
33
  """Evaluation result for a single query"""
 
130
 
131
  # Retrieval
132
  ret_start = time.time()
133
+ vectorstore = get_vectorstore("eval_"+eval_query.collection)
134
+ docs = retrieval(eval_query.query, filters, vectorstore)
135
  ret_end = time.time()
136
  ret_latency = (ret_end - ret_start) * 1000 # Convert to ms
137
 
 
443
  metadata = MetaData(**metadata)
444
  chunks = get_chunks([doc], metadata)
445
  documents.extend(chunks)
446
+
447
+ vectorstore = get_vectorstore("eval_"+collection_name, drop_old=True)
448
+ ingest_documents(documents, vectorstore)
 
 
449
  tot_docs += len(docs)
450
  print(f"✓ Completed '{collection_name}' collection")
451
 
src/core/index.py CHANGED
@@ -26,12 +26,13 @@ emb_model = OpenAIEmbeddings(model="text-embedding-3-small", dimensions=1536)
26
  MILVUS_URI = os.getenv("MILVUS_URI","./data/rag_task.db")
27
  MILVUS_API_KEY = os.getenv("MILVUS_API_KEY","")
28
 
29
- def get_vectorstore(collection_name: str) -> Milvus:
30
  vectorstore = Milvus(
31
  embedding_function=emb_model,
32
  collection_name=collection_name,
33
  connection_args={"uri": MILVUS_URI,"token": MILVUS_API_KEY},
34
  index_params={"index_type": "FLAT", "metric_type": "L2"},
 
35
  )
36
  # builtin_function=BM25BuiltInFunction(output_field_names="sparse"),
37
  # text_field="text",
 
26
  MILVUS_URI = os.getenv("MILVUS_URI","./data/rag_task.db")
27
  MILVUS_API_KEY = os.getenv("MILVUS_API_KEY","")
28
 
29
+ def get_vectorstore(collection_name: str, drop_old=False) -> Milvus:
30
  vectorstore = Milvus(
31
  embedding_function=emb_model,
32
  collection_name=collection_name,
33
  connection_args={"uri": MILVUS_URI,"token": MILVUS_API_KEY},
34
  index_params={"index_type": "FLAT", "metric_type": "L2"},
35
+ drop_old=drop_old,
36
  )
37
  # builtin_function=BM25BuiltInFunction(output_field_names="sparse"),
38
  # text_field="text",
src/core/ingest.py CHANGED
@@ -1,12 +1,12 @@
1
  from langchain_community.document_loaders import PDFMinerLoader,TextLoader
2
  from langchain_text_splitters import RecursiveCharacterTextSplitter
 
3
  from langchain_core.documents import Document
4
  from langchain_openai import ChatOpenAI
5
  from dotenv import load_dotenv, find_dotenv
6
  from typing import List
7
  import uuid
8
 
9
- from src.core.index import get_vectorstore
10
  from .index import MetaData
11
  from .utils import mask_pii
12
 
@@ -59,14 +59,12 @@ def get_chunks(documents: List[Document], metadata: MetaData):
59
  return chunks
60
 
61
 
62
- def ingest_documents(docs: List[Document], collection_name: str, vectorstore = None):
63
  """Ingest documents into the specified vectorstore collection."""
64
- if vectorstore is None:
65
- vectorstore = get_vectorstore(collection_name)
66
-
67
  ids = [str(uuid.uuid4()) for _ in range(len(docs))]
68
  vectorstore.add_documents(docs, ids=ids)
69
- success_message = f"Ingested {len(docs)} documents into {collection_name} index."
70
  print(success_message)
71
  return success_message
72
 
 
1
  from langchain_community.document_loaders import PDFMinerLoader,TextLoader
2
  from langchain_text_splitters import RecursiveCharacterTextSplitter
3
+ from langchain_milvus import Milvus
4
  from langchain_core.documents import Document
5
  from langchain_openai import ChatOpenAI
6
  from dotenv import load_dotenv, find_dotenv
7
  from typing import List
8
  import uuid
9
 
 
10
  from .index import MetaData
11
  from .utils import mask_pii
12
 
 
59
  return chunks
60
 
61
 
62
+ def ingest_documents(docs: List[Document], vectorstore:Milvus):
63
  """Ingest documents into the specified vectorstore collection."""
64
+
 
 
65
  ids = [str(uuid.uuid4()) for _ in range(len(docs))]
66
  vectorstore.add_documents(docs, ids=ids)
67
+ success_message = f"Ingested {len(docs)} documents into {vectorstore.collection_name} index."
68
  print(success_message)
69
  return success_message
70
 
src/core/retrieval.py CHANGED
@@ -1,10 +1,10 @@
1
  from langchain_core.documents import Document
2
- from langchain_openai import ChatOpenAI
3
  from langchain_community.retrievers import BM25Retriever
 
 
4
  from dotenv import load_dotenv, find_dotenv
5
  from typing import List
6
- from .index import get_vectorstore, MetaData
7
-
8
  find_dotenv()
9
  load_dotenv()
10
 
@@ -23,12 +23,11 @@ def reranker(query: str, docs: List[Document]) -> List[Document]:
23
 
24
 
25
  def retrieval(
26
- query: str, collection_name: str, filter_data: MetaData
27
  ) -> List[tuple[Document, float]]:
28
  """Retrieve relevant documents from the vector store based on the query and filters."""
29
- vectorstore = get_vectorstore(collection_name)
30
  print(
31
- f"RETRIEVAL query: {query[:40]}, for {collection_name} collection, with filters: {filter_data}"
32
  )
33
 
34
  filters = [f'language == "{filter_data.language}"']
 
1
  from langchain_core.documents import Document
 
2
  from langchain_community.retrievers import BM25Retriever
3
+ from langchain_openai import ChatOpenAI
4
+ from langchain_milvus import Milvus
5
  from dotenv import load_dotenv, find_dotenv
6
  from typing import List
7
+ from .index import MetaData
 
8
  find_dotenv()
9
  load_dotenv()
10
 
 
23
 
24
 
25
  def retrieval(
26
+ query: str, filter_data: MetaData, vectorstore: Milvus
27
  ) -> List[tuple[Document, float]]:
28
  """Retrieve relevant documents from the vector store based on the query and filters."""
 
29
  print(
30
+ f"RETRIEVAL query: {query[:40]}, for {vectorstore.collection_name} collection, with filters: {filter_data}"
31
  )
32
 
33
  filters = [f'language == "{filter_data.language}"']