Spaces:
Paused
Paused
# File: data_ingestion.py | |
import arxiv | |
from typing import List, Dict, Any | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_qdrant import Qdrant | |
from datasets import load_dataset, Dataset | |
from langchain_community.document_loaders import PyMuPDFLoader | |
from config import * | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
def fetch_arxiv_metadata(query: str, max_results: int = 10) -> List[Dict[str, Any]]: | |
logging.info(f"Fetching arXiv metadata for query: {query}") | |
client = arxiv.Client(page_size=max_results, delay_seconds=3, num_retries=3) | |
search = arxiv.Search(query=query, max_results=max_results) | |
results = [] | |
for result in client.results(search): | |
metadata = { | |
"title": result.title, | |
"authors": [author.name for author in result.authors], | |
"published": result.published.isoformat(), | |
"updated": result.updated.isoformat(), | |
"pdf_url": result.pdf_url, | |
"entry_id": result.entry_id, | |
"summary": result.summary | |
} | |
results.append(metadata) | |
logging.info(f"Fetched metadata for {len(results)} papers") | |
return results | |
def process_pdf(pdf_url: str) -> str: | |
logging.info(f"Processing PDF from URL: {pdf_url}") | |
loader = PyMuPDFLoader(pdf_url) | |
data = loader.load() | |
return "\n".join([page.page_content for page in data]) | |
def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str: | |
logging.info(f"Starting ingestion of {len(metadata_list)} documents") | |
qdrant = Qdrant.from_documents( | |
[], # We'll add documents one by one | |
embeddings, | |
url=QDRANT_API_URL, | |
api_key=QDRANT_API_KEY, | |
collection_name=COLLECTION_NAME, | |
) | |
dataset = load_dataset(DATASET_NAME) | |
new_data = [] | |
for i, metadata in enumerate(metadata_list): | |
try: | |
pdf_text = process_pdf(metadata["pdf_url"]) | |
chunks = text_splitter.split_text(pdf_text) | |
# Add to Qdrant | |
qdrant.add_texts(chunks, metadatas=[metadata] * len(chunks)) | |
# Prepare data for Hugging Face dataset | |
for chunk in chunks: | |
new_data.append({ | |
"text": chunk, | |
"metadata": metadata, | |
"embedding": embeddings.embed_query(chunk) | |
}) | |
logging.info(f"Processed document {i+1}/{len(metadata_list)}") | |
except Exception as e: | |
logging.error(f"Error processing document {i+1}: {str(e)}") | |
# Update Hugging Face dataset | |
new_dataset = Dataset.from_dict({k: [d[k] for d in new_data] for k in new_data[0]}) | |
dataset = dataset.add_item(new_dataset) | |
dataset.push_to_hub(DATASET_NAME) | |
result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset." | |
logging.info(result_message) | |
return result_message | |
def run_ingestion_pipeline(query: str, max_results: int = 10) -> str: | |
try: | |
metadata_list = fetch_arxiv_metadata(query, max_results) | |
result = ingest_documents(metadata_list) | |
return result | |
except Exception as e: | |
error_message = f"Error in ingestion pipeline: {str(e)}" | |
logging.error(error_message) | |
return error_message |