ramailkk commited on
Commit
04f14ca
·
0 Parent(s):

first commit

Browse files
Files changed (5) hide show
  1. config.yaml +37 -0
  2. data_processor.py +69 -0
  3. main.py +28 -0
  4. retriever.py +78 -0
  5. vector_db.py +33 -0
config.yaml ADDED
@@ -0,0 +1,37 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Pipeline Configuration for ArXiv RAG
2
+ project_name: "arxiv_cyber_advisor"
3
+
4
+ # Stage 1: Data Acquisition
5
+ data_ingestion:
6
+ category: "cs.AI"
7
+ limit: 20
8
+ save_local: true
9
+ raw_data_path: "data/raw_arxiv.csv"
10
+
11
+ # Stage 2: Processing & Embedding
12
+ embedding:
13
+ model_name: "all-MiniLM-L6-v2"
14
+ device: "cpu" # Change to "cuda" if testing on a GPU machine
15
+
16
+ chunking:
17
+ technique: "recursive"
18
+ chunk_size: 500
19
+ chunk_overlap: 50
20
+
21
+ # Stage 3: Vector Database (Pinecone)
22
+ vector_db:
23
+ index_name: "arxiv-index"
24
+ dimension: 384
25
+ metric: "cosine"
26
+
27
+ # Stage 4: Retrieval & Re-ranking
28
+ retrieval:
29
+ top_k_hybrid: 10
30
+ rerank_model: "cross-encoder/ms-marco-MiniLM-L-6-v2"
31
+ top_k_final: 3
32
+
33
+ # Stage 5: Generation (LLM)
34
+ llm:
35
+ model_id: "meta-llama/Meta-Llama-3-8B-Instruct"
36
+ max_new_tokens: 500
37
+ temperature: 0.1
data_processor.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import arxiv
2
+ import pandas as pd
3
+ from langchain_text_splitters import RecursiveCharacterTextSplitter, CharacterTextSplitter
4
+ from sentence_transformers import SentenceTransformer
5
+
6
+ def fetch_arxiv_data(category="cs.AI", limit=10):
7
+ """Fetches paper metadata and abstracts from arXiv."""
8
+ client = arxiv.Client()
9
+ search = arxiv.Search(
10
+ query=f"cat:{category}",
11
+ max_results=limit,
12
+ sort_by=arxiv.SortCriterion.SubmittedDate
13
+ )
14
+
15
+ results = []
16
+ for r in client.results(search):
17
+ results.append({
18
+ "id": r.entry_id.split('/')[-1],
19
+ "title": r.title,
20
+ "abstract": r.summary.replace('\n', ' '),
21
+ "url": r.pdf_url
22
+ })
23
+
24
+ return pd.DataFrame(results)
25
+
26
+ def get_text_splitter(technique="recursive", chunk_size=500, chunk_overlap=50):
27
+ """Returns a splitter based on the chosen technique and parameters."""
28
+ if technique == "recursive":
29
+ return RecursiveCharacterTextSplitter(
30
+ chunk_size=chunk_size,
31
+ chunk_overlap=chunk_overlap
32
+ )
33
+ elif technique == "character":
34
+ return CharacterTextSplitter(
35
+ separator="\n",
36
+ chunk_size=chunk_size,
37
+ chunk_overlap=chunk_overlap
38
+ )
39
+ else:
40
+ raise ValueError(f"Technique '{technique}' not supported.")
41
+
42
+ def process_to_chunks(df, model_name='all-MiniLM-L6-v2', technique="recursive", chunk_size=500, chunk_overlap=50):
43
+ """Splits abstracts into chunks and generates embeddings with custom parameters."""
44
+
45
+ # Initialize the specific model requested
46
+ print(f"🔧 Initializing Model: {model_name}...")
47
+ model = SentenceTransformer(model_name)
48
+
49
+ # Initialize the specific splitter requested
50
+ splitter = get_text_splitter(technique, chunk_size, chunk_overlap)
51
+
52
+ processed_chunks = []
53
+
54
+ for _, row in df.iterrows():
55
+ chunks = splitter.split_text(row['abstract'])
56
+
57
+ for i, text in enumerate(chunks):
58
+ embedding = model.encode(text).tolist()
59
+
60
+ processed_chunks.append({
61
+ "id": f"{row['id']}-chunk-{i}",
62
+ "values": embedding,
63
+ "metadata": {
64
+ "title": row['title'],
65
+ "text": text,
66
+ "url": row['url']
67
+ }
68
+ })
69
+ return processed_chunks
main.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import yaml
2
+ from data_processor import fetch_arxiv_data, process_to_chunks
3
+
4
+ def load_config():
5
+ with open("config.yaml", "r") as f:
6
+ return yaml.safe_load(f)
7
+
8
+ def main():
9
+ config = load_config()
10
+
11
+ # Run Stage 1
12
+ raw_data = fetch_arxiv_data(
13
+ category=config['data_ingestion']['category'],
14
+ limit=config['data_ingestion']['limit']
15
+ )
16
+
17
+ # Run Stage 2 using YAML defaults
18
+ final_chunks = process_to_chunks(
19
+ raw_data,
20
+ model_name=config['embedding']['model_name'],
21
+ chunk_size=config['chunking']['chunk_size'],
22
+ chunk_overlap=config['chunking']['chunk_overlap']
23
+ )
24
+
25
+ print(f"✅ Pipeline finished with {len(final_chunks)} chunks.")
26
+
27
+ if __name__ == "__main__":
28
+ main()
retriever.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ from rank_bm25 import BM25Okapi
3
+ from sentence_transformers import CrossEncoder
4
+
5
+ class HybridRetriever:
6
+ def __init__(self, final_chunks, embed_model, rerank_model_name='cross-encoder/ms-marco-MiniLM-L-6-v2'):
7
+ """
8
+ Initializes the search engines.
9
+ :param final_chunks: The list of chunk dictionaries with metadata.
10
+ :param embed_model: The SentenceTransformer model object used for query embedding.
11
+ """
12
+ self.final_chunks = final_chunks
13
+ self.embed_model = embed_model
14
+ self.rerank_model = CrossEncoder(rerank_model_name)
15
+
16
+ # Initialize BM25 locally
17
+ self.tokenized_corpus = [chunk['metadata']['text'].lower().split() for chunk in final_chunks]
18
+ self.bm25 = BM25Okapi(self.tokenized_corpus)
19
+
20
+ def _rrf_score(self, semantic_results, bm25_results, k=60):
21
+ """
22
+ Reciprocal Rank Fusion (RRF) Implementation.
23
+ Score = 1 / (k + rank)
24
+ """
25
+ scores = {}
26
+
27
+ # Rank is index + 1
28
+ for rank, chunk in enumerate(semantic_results):
29
+ scores[chunk] = scores.get(chunk, 0) + 1 / (k + rank + 1)
30
+
31
+ for rank, chunk in enumerate(bm25_results):
32
+ scores[chunk] = scores.get(chunk, 0) + 1 / (k + rank + 1)
33
+
34
+ # Sort by score descending
35
+ sorted_chunks = sorted(scores.items(), key=lambda x: x[1], reverse=True)
36
+ return [item[0] for item in sorted_chunks]
37
+
38
+ def search(self, query, index, top_k=10, mode="all", rerank_type="cross-encoder"):
39
+ """
40
+ :param mode: "semantic", "bm25", or "all"
41
+ :param rerank_type: "cross-encoder", "rrf", or "none"
42
+ """
43
+ semantic_chunks = []
44
+ bm25_chunks = []
45
+
46
+ # A. Semantic Search
47
+ if mode in ["semantic", "all"]:
48
+ query_vector = self.embed_model.encode(query).tolist()
49
+ res = index.query(vector=query_vector, top_k=top_k, include_metadata=True)
50
+ semantic_chunks = [match['metadata']['text'] for match in res['matches']]
51
+
52
+ # B. Keyword Search
53
+ if mode in ["bm25", "all"]:
54
+ tokenized_query = query.lower().split()
55
+ bm25_scores = self.bm25.get_scores(tokenized_query)
56
+ top_indices = np.argsort(bm25_scores)[::-1][:top_k]
57
+ bm25_chunks = [self.final_chunks[i]['metadata']['text'] for i in top_indices]
58
+
59
+ # C. Combination and Re-Ranking
60
+ if mode == "semantic":
61
+ combined = semantic_chunks
62
+ elif mode == "bm25":
63
+ combined = bm25_chunks
64
+ else:
65
+ # Mode is "all"
66
+ if rerank_type == "rrf":
67
+ return self._rrf_score(semantic_chunks, bm25_chunks)[:3]
68
+ else:
69
+ combined = list(set(semantic_chunks + bm25_chunks))
70
+
71
+ # D. Cross-Encoder Re-Ranking
72
+ if rerank_type == "cross-encoder" and len(combined) > 0:
73
+ pairs = [[query, chunk] for chunk in combined]
74
+ scores = self.rerank_model.predict(pairs)
75
+ results = sorted(zip(combined, scores), key=lambda x: x[1], reverse=True)
76
+ return [res[0] for res in results[:3]]
77
+
78
+ return combined[:3]
vector_db.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from pinecone import Pinecone, ServerlessSpec
3
+
4
+ def get_pinecone_index(api_key, index_name, dimension=384, metric="cosine"):
5
+ """Initializes Pinecone and returns the index object, creating it if necessary."""
6
+ pc = Pinecone(api_key=api_key)
7
+
8
+ # Check if index exists
9
+ existing_indexes = [idx.name for idx in pc.list_indexes()]
10
+
11
+ if index_name not in existing_indexes:
12
+ print(f"Creating new Pinecone index: {index_name}...")
13
+ pc.create_index(
14
+ name=index_name,
15
+ dimension=dimension,
16
+ metric=metric,
17
+ spec=ServerlessSpec(cloud="aws", region="us-east-1")
18
+ )
19
+ # Wait for index to be ready
20
+ while not pc.describe_index(index_name).status['ready']:
21
+ time.sleep(1)
22
+
23
+ return pc.Index(index_name)
24
+
25
+ def upsert_to_pinecone(index, chunks, batch_size=100):
26
+ """Upserts chunks to Pinecone in manageable batches."""
27
+ print(f"Uploading {len(chunks)} chunks to Pinecone...")
28
+
29
+ for i in range(0, len(chunks), batch_size):
30
+ batch = chunks[i : i + batch_size]
31
+ index.upsert(vectors=batch)
32
+
33
+ print("✅ Upsert complete.")