# file: document_processing.py import os import time import httpx from pathlib import Path from urllib.parse import urlparse, unquote from llama_index.readers.file import PyMuPDFReader from llama_index.core import Document as LlamaDocument from concurrent.futures import ThreadPoolExecutor, as_completed from pydantic import HttpUrl from typing import List # Define the batch size for parallel processing BATCH_SIZE = 25 def _process_page_batch(documents_batch: List[LlamaDocument]) -> str: """ Helper function to extract content from a batch of LlamaIndex Document objects and join them into a single string. """ return "\n\n".join([d.get_content() for d in documents_batch]) async def ingest_and_parse_document(doc_url: HttpUrl) -> str: """ Asynchronously downloads a document, saves it locally, and parses it to Markdown text using PyMuPDFReader with parallel processing. Args: doc_url: The Pydantic-validated URL of the document to process. Returns: A single string containing the document's extracted text. """ print(f"Initiating download from: {doc_url}") LOCAL_STORAGE_DIR = "data/" os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True) try: # Asynchronously download the document async with httpx.AsyncClient() as client: response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True) response.raise_for_status() doc_bytes = response.content print("Download successful.") # Determine a valid local filename parsed_path = urlparse(str(doc_url)).path filename = unquote(os.path.basename(parsed_path)) or "downloaded_document.pdf" local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename)) # Save the document locally with open(local_file_path, "wb") as f: f.write(doc_bytes) print(f"Document saved locally at: {local_file_path}") # Parse the document using LlamaIndex's PyMuPDFReader print("Parsing document with PyMuPDFReader...") loader = PyMuPDFReader() docs_from_loader = loader.load_data(file_path=local_file_path) # Parallelize the extraction of text from loaded pages start_time = time.perf_counter() all_extracted_texts = [] with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor: futures = [ executor.submit(_process_page_batch, docs_from_loader[i:i + BATCH_SIZE]) for i in range(0, len(docs_from_loader), BATCH_SIZE) ] for future in as_completed(futures): all_extracted_texts.append(future.result()) doc_text = "\n\n".join(all_extracted_texts) elapsed_time = time.perf_counter() - start_time print(f"Time taken for parallel text extraction: {elapsed_time:.4f} seconds.") if not doc_text: raise ValueError("Document parsing yielded no content.") print(f"Parsing complete. Extracted {len(doc_text)} characters.") return doc_text except httpx.HTTPStatusError as e: print(f"Error downloading document: {e}") raise except Exception as e: print(f"An unexpected error occurred during document processing: {e}") raise