File size: 15,566 Bytes
0b314ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
import httpx      # An asynchronous HTTP client.
import os         # To handle file paths and create directories.
import asyncio    # To run synchronous libraries in an async environment.
from urllib.parse import unquote, urlparse # To get the filename from the URL.
import uuid       # To generate unique filenames if needed.

from pydantic import HttpUrl
from langchain_pymupdf4llm import PyMuPDF4LLMLoader
import json
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
import os
import time
import fitz  # PyMuPDF
import httpx
import tempfile
import asyncio
import concurrent.futures
from typing import Optional
from pathlib import Path

import os
import argparse
from typing import Optional

from urllib.parse import urlparse, unquote
from llama_index.readers.file import PyMuPDFReader
from llama_index.core import Document # Make sure Document is imported if used elsewhere
from pydantic import HttpUrl # Assuming pydantic is installed for HttpUrl type hint
from concurrent.futures import ThreadPoolExecutor, as_completed

# Ensure required libraries are installed.
# You can install them using:
# pip install llama_cloud_services pydantic python-dotenv

from llama_cloud_services import LlamaExtract
from pydantic import BaseModel, Field
from dotenv import load_dotenv
import tempfile
from pathlib import Path

from llama_index.readers.file import PDFReader
from llama_index.readers.file import PyMuPDFReader

import PyPDF2

# Global variable for the extractor agent
llama_extract_agent = None


class Insurance(BaseModel):
    """
    A Pydantic model to define the data schema for extraction.
    The description helps guide the AI model.
    """
    headings: str = Field(description="An array of headings")


class Insurance(BaseModel):
    headings: str = Field(description="An array of headings")

def initialize_llama_extract_agent():
    global llama_extract_agent
    if llama_extract_agent is None:
        print("Initializing LlamaExtract client and getting agent...")
        try:
            extractor = LlamaExtract()
            llama_extract_agent = extractor.get_agent(name="insurance-parser")
            print("LlamaExtract agent initialized.")
        except Exception as e:
            print(f"Error initializing LlamaExtract agent: {e}")
            llama_extract_agent = None # Ensure it's None if there was an error


def extract_schema_from_file(file_path: str) -> Optional[Insurance]:
    if not os.path.exists(file_path):
        print(f"❌ Error: The file '{file_path}' was not found.")
        return None

    if llama_extract_agent is None:
        print("LlamaExtract agent not initialized. Attempting to initialize now.")
        initialize_llama_extract_agent()
        if llama_extract_agent is None:
            print("LlamaExtract agent failed to initialize. Cannot proceed with extraction.")
            return None

    print(f"πŸš€ Sending '{file_path}' to LlamaCloud for schema extraction...")

    try:
        result = llama_extract_agent.extract(file_path)

        if result and result.data:
            print("βœ… Extraction successful!")
            return result.data
        else:
            print("⚠️ Extraction did not return any data.")
            return None

    except Exception as e:
        print(f"\n❌ An error occurred during the API call: {e}")
        print("Please check your API key, network connection, and file format.")
        return None


def process_pdf_chunk(chunk_path: str) -> str:
    """
    Worker function for the ProcessPoolExecutor.
    
    It loads a single PDF chunk using PyMuPDF4LLMLoader and returns the
    extracted markdown content, prepending the original page number.
    
    Args:
        chunk_path: The file path to a temporary PDF chunk.
    Returns:
        A string containing the markdown content for the chunk.
    """
    try:
        # Load the document chunk using LangChain's loader
        loader = PyMuPDF4LLMLoader(chunk_path)
        documents = loader.load()
        
        page_contents = []
        for doc in documents:
            # Reconstruct the original page number from the filename for context
            original_page_in_doc = int(doc.metadata.get('page', -1))
            base_name = os.path.basename(chunk_path)
            # Filename format is "chunk_START-END.pdf"
            start_page_of_chunk = int(base_name.split('_')[1].split('-')[0])
            
            # The final page number is the start of the chunk + the page within the chunk doc
            actual_page_num = start_page_of_chunk + original_page_in_doc
            
            page_contents.append(f"\n## Page {actual_page_num}\n{doc.page_content.strip()}")

        return "".join(page_contents)
    except Exception as e:
        # Return an error message if a chunk fails to process
        return f"Error processing chunk {os.path.basename(chunk_path)}: {e}"


def pdf_to_markdown_parallel(pdf_path: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
    """
    Splits a local PDF into chunks, processes them in parallel using multiple
    CPU cores, and then combines the results.
    This method is ideal for very large documents and runs entirely locally.
    Args:
        pdf_path: The file path to the local PDF.
        output_path: (Optional) The file path to save the output Markdown.
        chunk_size: The number of pages to include in each parallel chunk.
    Returns:
        A string containing the full markdown content of the PDF.
    """
    start_time = time.monotonic()

    if not os.path.exists(pdf_path):
        raise FileNotFoundError(f"❌ The file '{pdf_path}' was not found.")

    temp_dir = "temp_pdf_chunks"
    os.makedirs(temp_dir, exist_ok=True)
    chunk_paths = []
    markdown_text = ""

    try:
        # Step 1: Split the source PDF into smaller chunk files
        print(f"Splitting '{os.path.basename(pdf_path)}' into chunks of {chunk_size} pages...")
        doc = fitz.open(pdf_path)
        num_pages = len(doc)
        for i in range(0, num_pages, chunk_size):
            chunk_start = i
            chunk_end = min(i + chunk_size, num_pages)
            
            # Create a new blank PDF and insert pages from the source
            with fitz.open() as chunk_doc:
                chunk_doc.insert_pdf(doc, from_page=chunk_start, to_page=chunk_end - 1)
                # Naming convention includes page numbers for sorting
                chunk_path = os.path.join(temp_dir, f"chunk_{chunk_start + 1}-{chunk_end}.pdf")
                chunk_doc.save(chunk_path)
                chunk_paths.append(chunk_path)
        doc.close()
        print(f"βœ… Successfully created {len(chunk_paths)} PDF chunks.")

        # Step 2: Process the chunks in parallel
        print(f"Processing chunks in parallel using up to {os.cpu_count()} CPU cores...")
        results = {}
        with concurrent.futures.ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
            # Map each future to its corresponding chunk path
            future_to_chunk = {executor.submit(process_pdf_chunk, path): path for path in chunk_paths}

            for future in concurrent.futures.as_completed(future_to_chunk):
                chunk_path = future_to_chunk[future]
                try:
                    # Store the result from the completed future
                    results[chunk_path] = future.result()
                except Exception as e:
                    results[chunk_path] = f"Failed to process chunk {os.path.basename(chunk_path)}: {e}"

        # Step 3: Combine results in the correct order
        print("Combining processed chunks...")
        # Sort results based on the original chunk path list to maintain order
        sorted_results = [results[path] for path in chunk_paths]
        markdown_text = "\n\n".join(sorted_results)

        # Step 4: Save to file if an output path is provided
        if output_path:
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(markdown_text)
            print(f"βœ… Markdown saved to: {output_path}")

    finally:
        # Step 5: Clean up temporary chunk files and directory
        # print("Cleaning up temporary files...")
        for path in chunk_paths:
            if os.path.exists(path):
                os.remove(path)
        if os.path.exists(temp_dir):
            try:
                if not os.listdir(temp_dir):
                    os.rmdir(temp_dir)
            except OSError as e:
                print(f"Could not remove temp directory '{temp_dir}': {e}")

    end_time = time.monotonic()
    print(f"⏱️ Total processing time: {end_time - start_time:.2f} seconds")
    return markdown_text


async def process_document(source: str, output_path: Optional[str] = None, chunk_size: int = 10) -> str:
    """
    High-level async function to process a PDF from a URL or local path.
    It downloads the file if a URL is provided, then uses the local parallel
    processor to convert it to markdown.
    Args:
        source: A local file path or a public URL to a PDF file.
        output_path: (Optional) The file path to save the output Markdown.
        chunk_size: The number of pages per chunk for parallel processing.
    Returns:
        A string containing the full markdown content of the PDF.
    """
    # Check if the source is a URL
    is_url = source.lower().startswith('http://') or source.lower().startswith('https://')

    if is_url:
        print(f"⬇️ Downloading from URL: {source}")
        temp_pdf_file_path = None
        try:
            # Download the file asynchronously
            async with httpx.AsyncClient() as client:
                response = await client.get(source, timeout=60.0, follow_redirects=True)
                response.raise_for_status()

            # Save the downloaded content to a temporary file
            with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.pdf') as temp_file:
                temp_file.write(response.content)
                temp_pdf_file_path = temp_file.name
            
            print(f"βœ… Download complete. Saved to temporary file: {temp_pdf_file_path}")
            
            # The parallel function is synchronous, so we run it in an executor
            # to avoid blocking the asyncio event loop.
            loop = asyncio.get_running_loop()
            markdown_content = await loop.run_in_executor(
                None,  # Use the default thread pool executor
                pdf_to_markdown_parallel,
                temp_pdf_file_path,
                output_path,
                chunk_size
            )
            return markdown_content
        finally:
            # Clean up the temporary file after processing
            if temp_pdf_file_path and os.path.exists(temp_pdf_file_path):
                os.remove(temp_pdf_file_path)
                print(f"πŸ—‘οΈ Cleaned up temporary file: {temp_pdf_file_path}")
    else:
        # If it's a local path, process it directly
        print(f"βš™οΈ Processing local file: {source}")
        loop = asyncio.get_running_loop()
        markdown_content = await loop.run_in_executor(
            None,
            pdf_to_markdown_parallel,
            source,
            output_path,
            chunk_size
        )
        return markdown_content


# Define the batch size for parallel processing
BATCH_SIZE = 25

def process_page_batch(documents_batch: list[Document]) -> str:
    """
    Helper function to extract content from a batch of LlamaIndex Document objects
    and join them into a single string.
    """
    return "\n\n".join([d.get_content() for d in documents_batch])

async def download_and_parse_document_using_llama_index(doc_url: HttpUrl) -> str:
    """
    Asynchronously downloads a document, saves it to a local directory,
    and then parses it using PyMuPDFReader from LlamaIndex.
    The parsing of page content is parallelized using a ThreadPoolExecutor
    with a specified batch size.
    Args:
        doc_url: The Pydantic-validated URL of the document to process.
    Returns:
        A single string containing the document's extracted text.
    """
    print(f"Initiating download from: {doc_url}")
    try:
        # Create the local storage directory if it doesn't exist.
        LOCAL_STORAGE_DIR = "data/"
        os.makedirs(LOCAL_STORAGE_DIR, exist_ok=True)

        async with httpx.AsyncClient() as client:
            response = await client.get(str(doc_url), timeout=30.0, follow_redirects=True)
            response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
            doc_bytes = response.content
            print("Download successful.")

        # --- Logic to determine the local filename ---
        parsed_path = urlparse(str(doc_url)).path
        filename = unquote(os.path.basename(parsed_path))
        if not filename:
            # If the URL doesn't provide a filename, create a generic one.
            # Ensure it's a PDF if we're using PyMuPDFReader for PDF parsing.
            filename = "downloaded_document.pdf"
        local_file_path = Path(os.path.join(LOCAL_STORAGE_DIR, filename))

        # Save the downloaded document to the local file.
        with open(local_file_path, "wb") as f:
            f.write(doc_bytes)
        print(f"Document saved locally at: {local_file_path}")
        print("Parsing document with PyMuPDFReader...")

        # PyMuPDFReader parsing logic: loads the entire document and returns a list of Document objects (one per page)
        loader = PyMuPDFReader()
        docs0 = loader.load_data(file_path=Path(local_file_path))

        # Measure time for parallel doc_text creation
        start_time_conversion = time.perf_counter()

        all_extracted_texts = []
        # Use ThreadPoolExecutor for parallel processing of page batches
        # The max_workers argument can be adjusted based on available CPU cores
        with ThreadPoolExecutor(max_workers=os.cpu_count() or 4) as executor:
            futures = []
            # Divide docs0 (list of pages) into batches of BATCH_SIZE
            for i in range(0, len(docs0), BATCH_SIZE):
                batch = docs0[i:i + BATCH_SIZE]
                # Submit each batch to the executor for processing
                futures.append(executor.submit(process_page_batch, batch))

            # Collect results as they complete
            # as_completed returns futures as they finish, maintaining responsiveness
            for future in as_completed(futures):
                all_extracted_texts.append(future.result())

        # Join all extracted texts from the batches into a single string
        doc_text = "\n\n".join(all_extracted_texts)

        end_time_conversion = time.perf_counter()
        elapsed_time_conversion = end_time_conversion - start_time_conversion
        print(f"Time taken for parallel doc_text creation: {elapsed_time_conversion:.6f} seconds.")

        # The extracted text is now in 'doc_text'
        if doc_text:
            print(f"Parsing complete. Extracted {len(doc_text)} characters.")
            # The local file is NOT deleted, as per original code.
            return doc_text
        else:
            raise ValueError("PyMuPDFReader did not extract any content.")

    except httpx.HTTPStatusError as e:
        print(f"Error downloading document: HTTP status error: {e}")
        raise
    except Exception as e:
        print(f"An unexpected error occurred during processing: {e}")
        raise