|
|
"""Module dealing specifically with loading files into Document objects. |
|
|
Contains the `load_file` function to load text, PDF, and markdown files. |
|
|
Uses Docling for advanced PDF parsing with OCR support for scanned PDFs. |
|
|
Falls back to PyMuPDF if Docling is not available. |
|
|
|
|
|
Supports multimodal document loading with automatic image extraction from PDFs. |
|
|
|
|
|
## For testing: |
|
|
- Run this file from `server` folder as: |
|
|
- `python -m llm_system.utils.loader` |
|
|
""" |
|
|
|
|
|
import os |
|
|
from typing import List, Optional, Dict, Any |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from dataclasses import dataclass, field |
|
|
import uuid |
|
|
|
|
|
from langchain_core.documents import Document |
|
|
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader |
|
|
from langchain_community.document_loaders import UnstructuredMarkdownLoader |
|
|
import fitz |
|
|
from PIL import Image |
|
|
|
|
|
from logger import get_logger |
|
|
log = get_logger(name="doc_loader") |
|
|
|
|
|
|
|
|
try: |
|
|
from docling.document_converter import DocumentConverter |
|
|
DOCLING_AVAILABLE = True |
|
|
log.info("β
Docling library available - will use for PDF parsing with OCR support") |
|
|
except ImportError: |
|
|
DOCLING_AVAILABLE = False |
|
|
log.warning("β οΈ Docling library not available - will fallback to PyMuPDF for PDFs") |
|
|
|
|
|
|
|
|
try: |
|
|
from llm_system.config import EXTRACT_IMAGES_FROM_PDF, IMAGE_OUTPUT_DIR |
|
|
except ImportError: |
|
|
|
|
|
EXTRACT_IMAGES_FROM_PDF = True |
|
|
IMAGE_OUTPUT_DIR = "server/user_uploads/extracted_images" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ImageContent: |
|
|
"""Represents an image extracted from a document. |
|
|
|
|
|
Attributes: |
|
|
image_id: Unique identifier for the image |
|
|
image_path: Path to where the image is stored on disk |
|
|
description: Text description of the image (optional) |
|
|
page_number: Page number where image was found |
|
|
position: Position on page (e.g., "top", "center", "bottom") |
|
|
metadata: Additional metadata (size, format, source PDF, etc.) |
|
|
""" |
|
|
image_id: str |
|
|
image_path: Path |
|
|
description: str = "" |
|
|
page_number: int = 0 |
|
|
position: str = "" |
|
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
|
|
def __post_init__(self): |
|
|
"""Ensure image_path is a Path object.""" |
|
|
if isinstance(self.image_path, str): |
|
|
self.image_path = Path(self.image_path) |
|
|
|
|
|
|
|
|
def extract_images_from_pdf(pdf_path: str, output_dir: str = None, user_id: str = "") -> List[ImageContent]: |
|
|
"""Extract images from a PDF file and save them to disk. |
|
|
|
|
|
Attempts to use Docling's advanced image extraction first, |
|
|
falls back to PyMuPDF for faster extraction. |
|
|
|
|
|
Args: |
|
|
pdf_path: Path to the PDF file |
|
|
output_dir: Directory to save extracted images (default: IMAGE_OUTPUT_DIR) |
|
|
user_id: User ID for organizing images |
|
|
|
|
|
Returns: |
|
|
List of ImageContent objects with paths and metadata |
|
|
""" |
|
|
if not EXTRACT_IMAGES_FROM_PDF: |
|
|
log.debug("Image extraction disabled in config") |
|
|
return [] |
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = IMAGE_OUTPUT_DIR |
|
|
|
|
|
images = [] |
|
|
pdf_name = Path(pdf_path).stem |
|
|
|
|
|
try: |
|
|
|
|
|
pdf_name_safe = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in pdf_name) |
|
|
|
|
|
|
|
|
user_image_dir = Path(output_dir) / user_id / pdf_name_safe |
|
|
user_image_dir.mkdir(parents=True, exist_ok=True) |
|
|
log.info(f"Created image output directory: {user_image_dir}") |
|
|
|
|
|
|
|
|
docling_images = [] |
|
|
if DOCLING_AVAILABLE: |
|
|
log.info(f"π Attempting to extract images using Docling...") |
|
|
try: |
|
|
converter = DocumentConverter() |
|
|
docling_doc = converter.convert(pdf_path) |
|
|
doc = docling_doc.document |
|
|
|
|
|
|
|
|
|
|
|
if hasattr(doc, 'body') and hasattr(doc.body, 'blocks'): |
|
|
log.debug(f"Scanning {len(doc.body.blocks)} Docling blocks for pictures...") |
|
|
for block_idx, block in enumerate(doc.body.blocks): |
|
|
block_type = type(block).__name__ |
|
|
log.debug(f"Block {block_idx}: {block_type}") |
|
|
|
|
|
|
|
|
if 'Picture' in block_type: |
|
|
try: |
|
|
|
|
|
if hasattr(block, 'image') and block.image is not None: |
|
|
image_id = f"img_docling_{block_idx:03d}_{uuid.uuid4().hex[:8]}" |
|
|
image_filename = f"{image_id}.png" |
|
|
image_path = user_image_dir / image_filename |
|
|
|
|
|
|
|
|
block.image.save(str(image_path), format='PNG') |
|
|
log.info(f"β
Extracted image via Docling: {image_path}") |
|
|
|
|
|
|
|
|
page_num = 0 |
|
|
if hasattr(block, 'page_number'): |
|
|
page_num = block.page_number |
|
|
|
|
|
|
|
|
image_content = ImageContent( |
|
|
image_id=image_id, |
|
|
image_path=image_path, |
|
|
page_number=page_num + 1, |
|
|
position="middle", |
|
|
metadata={ |
|
|
"source_pdf": pdf_name, |
|
|
"extracted_at": datetime.now().isoformat(), |
|
|
"format": "PNG", |
|
|
"extractor": "docling", |
|
|
"size": (block.image.width, block.image.height) if hasattr(block.image, 'width') else (0, 0), |
|
|
} |
|
|
) |
|
|
docling_images.append(image_content) |
|
|
except Exception as e: |
|
|
log.debug(f"Could not extract Docling picture block {block_idx}: {e}") |
|
|
continue |
|
|
|
|
|
if docling_images: |
|
|
log.info(f"β
Docling extracted {len(docling_images)} images") |
|
|
images.extend(docling_images) |
|
|
return images |
|
|
else: |
|
|
log.debug("Docling found no extractable picture blocks, falling back to PyMuPDF") |
|
|
|
|
|
except Exception as e: |
|
|
log.warning(f"β οΈ Docling image extraction failed: {e}, falling back to PyMuPDF") |
|
|
|
|
|
|
|
|
log.info(f"π Extracting images using PyMuPDF...") |
|
|
pdf_document = fitz.open(pdf_path) |
|
|
log.info(f"Opened PDF with {pdf_document.page_count} pages") |
|
|
|
|
|
for page_num in range(pdf_document.page_count): |
|
|
page = pdf_document[page_num] |
|
|
image_list = page.get_images(full=True) |
|
|
|
|
|
if not image_list: |
|
|
log.debug(f"No images found on page {page_num}") |
|
|
continue |
|
|
|
|
|
log.info(f"Found {len(image_list)} images on page {page_num}") |
|
|
|
|
|
for img_index, img in enumerate(image_list): |
|
|
try: |
|
|
xref = img[0] |
|
|
pix = fitz.Pixmap(pdf_document, xref) |
|
|
|
|
|
|
|
|
if pix.n - pix.alpha < 4: |
|
|
pix = fitz.Pixmap(fitz.csRGB, pix) |
|
|
|
|
|
|
|
|
image_id = f"img_{page_num:03d}_{img_index:02d}_{uuid.uuid4().hex[:8]}" |
|
|
image_filename = f"{image_id}.png" |
|
|
image_path = user_image_dir / image_filename |
|
|
|
|
|
|
|
|
pix.save(str(image_path)) |
|
|
log.info(f"β
Saved image: {image_path}") |
|
|
|
|
|
|
|
|
image_content = ImageContent( |
|
|
image_id=image_id, |
|
|
image_path=image_path, |
|
|
page_number=page_num + 1, |
|
|
position="middle", |
|
|
metadata={ |
|
|
"source_pdf": pdf_name, |
|
|
"extracted_at": datetime.now().isoformat(), |
|
|
"format": "PNG", |
|
|
"extractor": "pymupdf", |
|
|
"size": (pix.width, pix.height), |
|
|
} |
|
|
) |
|
|
images.append(image_content) |
|
|
|
|
|
except Exception as e: |
|
|
log.warning(f"Failed to extract image {img_index} on page {page_num}: {e}") |
|
|
continue |
|
|
|
|
|
pdf_document.close() |
|
|
log.info(f"β
Extracted {len(images)} images from PDF") |
|
|
|
|
|
except Exception as e: |
|
|
log.error(f"β Error extracting images from PDF: {e}") |
|
|
import traceback |
|
|
log.error(traceback.format_exc()) |
|
|
|
|
|
return images |
|
|
|
|
|
|
|
|
def load_file(user_id: str, file_path: str) -> tuple[bool, List[Document], str]: |
|
|
"""Load a file and return its content as a list of Document objects. Usually one document per page. |
|
|
|
|
|
For PDFs, automatically extracts images and attaches them to metadata. |
|
|
|
|
|
Args: |
|
|
user_id (str): The ID of the user who is loading the file. |
|
|
file_path (str): The absolute path to the file to be loaded. |
|
|
|
|
|
Returns: |
|
|
tuple[bool, List[Document], str]: A tuple containing: |
|
|
- bool: True if the file was loaded successfully, False otherwise. |
|
|
- List[Document]: A list of Document objects containing the file's content. |
|
|
- str: Message indicating the result of the loading operation. |
|
|
""" |
|
|
|
|
|
log.info(f"π load_file() starting - file_path: {file_path}, user_id: {user_id}") |
|
|
file_extension = file_path.split('.')[-1].lower() |
|
|
log.info(f"π File extension detected: {file_extension}") |
|
|
|
|
|
if file_extension not in ['txt', 'pdf', "md"]: |
|
|
log.error(f"β Unsupported file type: {file_extension}.") |
|
|
return False, [], f"Unsupported file type: {file_extension}. Supported types are: txt, pdf." |
|
|
|
|
|
if file_path.endswith('.txt'): |
|
|
log.info(f"π Loading as TXT file") |
|
|
loader = TextLoader(file_path, encoding='utf-8') |
|
|
|
|
|
elif file_path.endswith('.md'): |
|
|
log.info(f"π Loading as Markdown file") |
|
|
loader = UnstructuredMarkdownLoader(file_path) |
|
|
|
|
|
else: |
|
|
|
|
|
file_content = None |
|
|
use_docling = DOCLING_AVAILABLE |
|
|
|
|
|
if use_docling: |
|
|
log.info(f"π Loading PDF using Docling (with OCR support for scanned PDFs)") |
|
|
try: |
|
|
converter = DocumentConverter() |
|
|
docling_doc = converter.convert(file_path) |
|
|
|
|
|
|
|
|
|
|
|
markdown_text = docling_doc.document.export_to_markdown() |
|
|
|
|
|
|
|
|
file_content = [ |
|
|
Document( |
|
|
page_content=markdown_text, |
|
|
metadata={ |
|
|
"source": os.path.basename(file_path), |
|
|
"file_path": file_path, |
|
|
"loader": "docling" |
|
|
} |
|
|
) |
|
|
] |
|
|
log.info(f"β
Docling successfully parsed PDF: {len(markdown_text)} chars extracted") |
|
|
except Exception as e: |
|
|
log.warning(f"β οΈ Docling parsing failed: {e}, falling back to PyMuPDF") |
|
|
file_content = None |
|
|
use_docling = False |
|
|
|
|
|
if not use_docling: |
|
|
|
|
|
log.info(f"π Loading as PDF file using PyMuPDFLoader") |
|
|
loader = PyMuPDFLoader(file_path, extract_images=False) |
|
|
|
|
|
|
|
|
if file_content is None: |
|
|
|
|
|
log.info(f"β³ Executing loader.load()...") |
|
|
try: |
|
|
file_content = loader.load() |
|
|
log.info(f"β
loader.load() completed, got {len(file_content)} pages/documents") |
|
|
except Exception as e: |
|
|
log.error(f"β loader.load() failed with exception: {e}") |
|
|
import traceback |
|
|
log.error(f"Traceback: {traceback.format_exc()}") |
|
|
return False, [], f"Error loading file: {e}" |
|
|
|
|
|
|
|
|
extracted_images = [] |
|
|
if file_path.endswith('.pdf'): |
|
|
log.info(f"πΌοΈ Extracting images from PDF...") |
|
|
extracted_images = extract_images_from_pdf(file_path, user_id=user_id) |
|
|
log.info(f"Found {len(extracted_images)} images") |
|
|
|
|
|
|
|
|
for doc in file_content: |
|
|
doc.metadata['user_id'] = user_id |
|
|
|
|
|
|
|
|
if extracted_images: |
|
|
|
|
|
doc.metadata['images'] = [ |
|
|
{ |
|
|
'image_id': img.image_id, |
|
|
'image_path': str(img.image_path), |
|
|
'page_number': img.page_number, |
|
|
'position': img.position, |
|
|
'metadata': img.metadata |
|
|
} |
|
|
for img in extracted_images |
|
|
] |
|
|
log.info(f"Attached {len(extracted_images)} images to document metadata") |
|
|
|
|
|
|
|
|
|
|
|
if 'file_path' in doc.metadata: |
|
|
doc.metadata['file_path'] = os.path.basename(doc.metadata['file_path']) |
|
|
|
|
|
if 'source' in doc.metadata: |
|
|
|
|
|
if "www." in doc.metadata['source'] or "http" in doc.metadata['source']: |
|
|
continue |
|
|
|
|
|
else: |
|
|
doc.metadata['source'] = os.path.basename(doc.metadata['source']) |
|
|
|
|
|
if not file_content: |
|
|
log.error(f"No content found in the file: {file_path}") |
|
|
return True, [], f"No content found in the file: {file_path}" |
|
|
|
|
|
log.info(f"Loaded {len(file_content)} documents from {file_path} for user {user_id} (with {len(extracted_images)} images).") |
|
|
return True, file_content, f"Loaded {len(file_content)} documents with {len(extracted_images)} images." |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
import os |
|
|
print(os.getcwd()) |
|
|
try: |
|
|
status, docs, message = load_file( |
|
|
user_id="test_user", |
|
|
file_path="/Users/neetikasaxena/Documents/sanchit/sample_code/chat-with-your-data/test_data/resume_sanchit_imo_health.pdf" |
|
|
|
|
|
|
|
|
) |
|
|
|
|
|
print(status) |
|
|
print(message) |
|
|
print(len(docs)) |
|
|
|
|
|
for ind, doc in enumerate(docs[:3]): |
|
|
print("\n") |
|
|
print(repr(doc)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error loading file: {e}") |
|
|
|