Spaces:
Sleeping
Sleeping
File size: 15,566 Bytes
0b314ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
import httpx # An asynchronous HTTP client.
import os # To handle file paths and create directories.
import asyncio # To run synchronous libraries in an async environment.
from urllib.parse import unquote, urlparse # To get the filename from the URL.
import uuid # To generate unique filenames if needed.
from pydantic import HttpUrl
from langchain_pymupdf4llm import PyMuPDF4LLMLoader
import json
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import os
import time
import fitz # PyMuPDF
import httpx
import tempfile
import asyncio
import concurrent.futures
from typing import Optional
from pathlib import Path
import os
import argparse
from typing import Optional
from urllib.parse import urlparse, unquote
from llama_index.readers.file import PyMuPDFReader
from llama_index.core import Document # Make sure Document is imported if used elsewhere
from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint
from concurrent.futures import ThreadPoolExecutor, as_completed
# Ensure required libraries are installed.
# You can install them using:
# pip install llama_cloud_services pydantic python-dotenv
from llama_cloud_services import LlamaExtract
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import tempfile
from pathlib import Path
from llama_index.readers.file import PDFReader
from llama_index.readers.file import PyMuPDFReader
import PyPDF2
# Global variable for the extractor agent
llama_extract_agent = None
class Insurance(BaseModel):
"""
A Pydantic model to define the data schema for extraction.
The description helps guide the AI model.
"""
headings: str = Field(description="An array of headings")
class Insurance(BaseModel):
headings: str = Field(description="An array of headings")
def initialize_llama_extract_agent():
global llama_extract_agent
if llama_extract_agent is None:
print("Initializing LlamaExtract client and getting agent...")
try:
extractor = LlamaExtract()
llama_extract_agent = extractor.get_agent(name="insurance-parser")
print("LlamaExtract agent initialized.")
except Exception as e:
print(f"Error initializing LlamaExtract agent: {e}")
llama_extract_agent = None # Ensure it's None if there was an error
def extract_schema_from_file(file_path: str) -> Optional[Insurance]:
if not os.path.exists(file_path):
print(f"β Error: The file '{file_path}' was not found.")
return None
if llama_extract_agent is None:
print("LlamaExtract agent not initialized. Attempting to initialize now.")
initialize_llama_extract_agent()
if llama_extract_agent is None:
print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.")
return None
print(f"π Sending '{file_path}' to LlamaCloud for schema extraction...")
try:
result = llama_extract_agent.extract(file_path)
if result and result.data:
print("β
Extraction successful!")
return result.data
else:
print("β οΈ Extraction did not return any data.")
return None
except Exception as e:
print(f"\nβ An error occurred during the API call: {e}")
print("Please check your API key, network connection, and file format.")
return None
def process_pdf_chunk(chunk_path: str) -> str:
"""
Worker function for the ProcessPoolExecutor.
It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the
extracted markdown content, prepending the original page number.
Args:
chunk_path: The file path to a temporary PDF chunk.
Returns:
A string containing the markdown content for the chunk.
"""
try:
# Load the document chunk using LangChain's loader
loader = PyMuPDF4LLMLoader(chunk_path)
documents = loader.load()
page_contents = []
for doc in documents:
# Reconstruct the original page number from the filename for context
original_page_in_doc = int(doc.metadata.get('page', -1))
base_name = os.path.basename(chunk_path)
# Filename format is "chunk_START-END.pdf"
start_page_of_chunk = int(base_name.split('_')[1].split('-')[0])
# The final page number is the start of the chunk + the page within the chunk doc
actual_page_num = start_page_of_chunk + original_page_in_doc
page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}")
return "".join(page_contents)
except Exception as e:
# Return an error message if a chunk fails to process
return f"Error processing chunk {os.path.basename(chunk_path)}: {e}"
def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
"""
Splits a local PDF into chunks, processes them in parallel using multiple
CPU cores, and then combines the results.
This method is ideal for very large documents and runs entirely locally.
Args:
pdf_path: The file path to the local PDF.
output_path: (Optional) The file path to save the output Markdown.
chunk_size: The number of pages to include in each parallel chunk.
Returns:
A string containing the full markdown content of the PDF.
"""
start_time = time.monotonic()
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"β The file '{pdf_path}' was not found.")
temp_dir = "temp_pdf_chunks"
os.makedirs(temp_dir, exist_ok=True)
chunk_paths = []
markdown_text = ""
try:
# Step 1: Split the source PDF into smaller chunk files
print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...")
doc = fitz.open(pdf_path)
num_pages = len(doc)
for i in range(0, num_pages, chunk_size):
chunk_start = i
chunk_end = min(i + chunk_size, num_pages)
# Create a new blank PDF and insert pages from the source
with fitz.open() as chunk_doc:
chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1)
# Naming convention includes page numbers for sorting
chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf")
chunk_doc.save(chunk_path)
chunk_paths.append(chunk_path)
doc.close()
print(f"β
Successfully created {len(chunk_paths)} PDF chunks.")
# Step 2: Process the chunks in parallel
print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...")
results = {}
with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# Map each future to its corresponding chunk path
future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths}
for future in concurrent.futures.as_completed(future_to_chunk):
chunk_path = future_to_chunk[future]
try:
# Store the result from the completed future
results[chunk_path] = future.result()
except Exception as e:
results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}"
# Step 3: Combine results in the correct order
print("Combining processed chunks...")
# Sort results based on the original chunk path list to maintain order
sorted_results = [results[path] for path in chunk_paths]
markdown_text = "\n\n".join(sorted_results)
# Step 4: Save to file if an output path is provided
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(markdown_text)
print(f"β
Markdown saved to: {output_path}")
finally:
# Step 5: Clean up temporary chunk files and directory
# print("Cleaning up temporary files...")
for path in chunk_paths:
if os.path.exists(path):
os.remove(path)
if os.path.exists(temp_dir):
try:
if not os.listdir(temp_dir):
os.rmdir(temp_dir)
except OSError as e:
print(f"Could not remove temp directory '{temp_dir}': {e}")
end_time = time.monotonic()
print(f"β±οΈ Total processing time: {end_time - start_time:.2f} seconds")
return markdown_text
async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
"""
High-level async function to process a PDF from a URL or local path.
It downloads the file if a URL is provided, then uses the local parallel
processor to convert it to markdown.
Args:
source: A local file path or a public URL to a PDF file.
output_path: (Optional) The file path to save the output Markdown.
chunk_size: The number of pages per chunk for parallel processing.
Returns:
A string containing the full markdown content of the PDF.
"""
# Check if the source is a URL
is_url = source.lower().startswith('http://') or source.lower().startswith('https://')
if is_url:
print(f"β¬οΈ Downloading from URL: {source}")
temp_pdf_file_path = None
try:
# Download the file asynchronously
async with httpx.AsyncClient() as client:
response = await client.get(source, timeout=60.0, follow_redirects=True)
response.raise_for_status()
# Save the downloaded content to a temporary file
with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file:
temp_file.write(response.content)
temp_pdf_file_path = temp_file.name
print(f"β
Download complete. Saved to temporary file: {temp_pdf_file_path}")
# The parallel function is synchronous, so we run it in an executor
# to avoid blocking the asyncio event loop.
loop = asyncio.get_running_loop()
markdown_content = await loop.run_in_executor(
None, # Use the default thread pool executor
pdf_to_markdown_parallel,
temp_pdf_file_path,
output_path,
chunk_size
)
return markdown_content
finally:
# Clean up the temporary file after processing
if temp_pdf_file_path and os.path.exists(temp_pdf_file_path):
os.remove(temp_pdf_file_path)
print(f"ποΈ Cleaned up temporary file: {temp_pdf_file_path}")
else:
# If it's a local path, process it directly
print(f"βοΈ Processing local file: {source}")
loop = asyncio.get_running_loop()
markdown_content = await loop.run_in_executor(
None,
pdf_to_markdown_parallel,
source,
output_path,
chunk_size
)
return markdown_content
# Define the batch size for parallel processing
BATCH_SIZE = 25
def process_page_batch(documents_batch: list[Document]) -> str:
"""
Helper function to extract content from a batch of LlamaIndex Document objects
and join them into a single string.
"""
return "\n\n".join([d.get_content() for d in documents_batch])
async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str:
"""
Asynchronously downloads a document, saves it to a local directory,
and then parses it using PyMuPDFReader from LlamaIndex.
The parsing of page content is parallelized using a ThreadPoolExecutor
with a specified batch size.
Args:
doc_url: The Pydantic-validated URL of the document to process.
Returns:
A single string containing the document's extracted text.
"""
print(f"Initiating download from: {doc_url}")
try:
# Create the local storage directory if it doesn't exist.
LOCAL_STORAGE_DIR = "data/"
os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)
async with httpx.AsyncClient() as client:
response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
doc_bytes = response.content
print("Download successful.")
# --- Logic to determine the local filename ---
parsed_path = urlparse(str(doc_url)).path
filename = unquote(os.path.basename(parsed_path))
if not filename:
# If the URL doesn't provide a filename, create a generic one.
# Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing.
filename = "downloaded_document.pdf"
local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))
# Save the downloaded document to the local file.
with open(local_file_path, "wb") as f:
f.write(doc_bytes)
print(f"Document saved locally at: {local_file_path}")
print("Parsing document with PyMuPDFReader...")
# PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page)
loader = PyMuPDFReader()
docs0 = loader.load_data(file_path=Path(local_file_path))
# Measure time for parallel doc_text creation
start_time_conversion = time.perf_counter()
all_extracted_texts = []
# Use ThreadPoolExecutor for parallel processing of page batches
# The max_workers argument can be adjusted based on available CPU cores
with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
futures = []
# Divide docs0 (list of pages) into batches of BATCH_SIZE
for i in range(0, len(docs0), BATCH_SIZE):
batch = docs0[i:i + BATCH_SIZE]
# Submit each batch to the executor for processing
futures.append(executor.submit(process_page_batch, batch))
# Collect results as they complete
# as_completed returns futures as they finish, maintaining responsiveness
for future in as_completed(futures):
all_extracted_texts.append(future.result())
# Join all extracted texts from the batches into a single string
doc_text = "\n\n".join(all_extracted_texts)
end_time_conversion = time.perf_counter()
elapsed_time_conversion = end_time_conversion - start_time_conversion
print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.")
# The extracted text is now in 'doc_text'
if doc_text:
print(f"Parsing complete. Extracted {len(doc_text)} characters.")
# The local file is NOT deleted, as per original code.
return doc_text
else:
raise ValueError("PyMuPDFReader did not extract any content.")
except httpx.HTTPStatusError as e:
print(f"Error downloading document: HTTP status error: {e}")
raise
except Exception as e:
print(f"An unexpected error occurred during processing: {e}")
raise |