t1-tax-pdf-processor / app /api /processor_service.py
Hamza4100's picture
Upload 23 files
aa8e38b verified
"""
PDF processing service that orchestrates extraction, mapping, and filling.
"""
import uuid
import shutil
from pathlib import Path
from typing import Optional
from datetime import datetime
from app.config import settings
from app.extractor import PDFExtractor
from app.ocr import OCRProcessor
from app.mapper import FieldMapper
from app.filler import PDFFiller
from app.utils.logging import get_logger
from app.utils.exceptions import (
PDFProcessorError,
PDFExtractionError,
OCRError,
MappingError,
PDFFillingError,
FileValidationError
)
logger = get_logger(__name__)
class ProcessingSession:
"""Represents a single PDF processing session."""
def __init__(self, session_id: str):
self.session_id = session_id
self.created_at = datetime.utcnow()
self.source_pdf: Optional[Path] = None
self.template_pdf: Optional[Path] = None
self.output_pdf: Optional[Path] = None
self.extracted_data: Optional[dict] = None
self.mapped_fields: Optional[dict] = None
self.status: str = "initialized"
self.errors: list[str] = []
self.warnings: list[str] = []
@property
def session_dir(self) -> Path:
return settings.upload_dir / self.session_id
class PDFProcessorService:
"""
Main service class that coordinates PDF processing.
"""
def __init__(self, mapping_config_path: Optional[Path] = None):
self.extractor = PDFExtractor()
self.filler = PDFFiller()
self.mapper = FieldMapper(config_path=mapping_config_path)
# Initialize OCR (may not be available)
try:
self.ocr = OCRProcessor()
self.ocr_available = self.ocr.is_available()
except Exception as e:
logger.warning(f"OCR not available: {e}")
self.ocr = None
self.ocr_available = False
# Track active sessions
self.sessions: dict[str, ProcessingSession] = {}
def create_session(self) -> ProcessingSession:
"""Create a new processing session."""
session_id = str(uuid.uuid4())
session = ProcessingSession(session_id)
session.session_dir.mkdir(parents=True, exist_ok=True)
self.sessions[session_id] = session
logger.info(f"Created new session: {session_id}")
return session
def get_session(self, session_id: str) -> ProcessingSession:
"""Get an existing session."""
if session_id not in self.sessions:
# Try to restore from disk
session_dir = settings.upload_dir / session_id
if session_dir.exists():
session = ProcessingSession(session_id)
self.sessions[session_id] = session
return session
raise FileValidationError(f"Session not found: {session_id}")
return self.sessions[session_id]
def save_uploaded_file(
self,
session: ProcessingSession,
file_content: bytes,
filename: str,
file_type: str
) -> Path:
"""
Save an uploaded file to the session directory.
Args:
session: Processing session.
file_content: File content as bytes.
filename: Original filename.
file_type: 'source' or 'template'.
Returns:
Path to saved file.
"""
# Validate extension
ext = Path(filename).suffix.lower()
if ext not in settings.allowed_extensions:
raise FileValidationError(
f"Invalid file extension: {ext}. Allowed: {settings.allowed_extensions}"
)
# Validate file size
size_mb = len(file_content) / (1024 * 1024)
if size_mb > settings.max_file_size_mb:
raise FileValidationError(
f"File too large: {size_mb:.2f}MB. Max: {settings.max_file_size_mb}MB"
)
# Save file
safe_filename = f"{file_type}_{filename}"
file_path = session.session_dir / safe_filename
file_path.write_bytes(file_content)
if file_type == "source":
session.source_pdf = file_path
elif file_type == "template":
session.template_pdf = file_path
logger.info(f"Saved {file_type} file: {file_path}")
return file_path
def process(
self,
session: ProcessingSession,
line_numbers: Optional[list[str]] = None,
use_ocr: Optional[bool] = None,
flatten: bool = False
) -> dict:
"""
Process the uploaded PDFs.
Args:
session: Processing session.
line_numbers: Specific line numbers to extract.
use_ocr: Force OCR processing.
flatten: Flatten output PDF form.
Returns:
Processing result dictionary.
"""
if not session.source_pdf:
raise FileValidationError("No source PDF uploaded")
if not session.template_pdf:
raise FileValidationError("No template PDF uploaded")
session.status = "processing"
try:
# Step 1: Extract data from source PDF
extracted = self._extract_data(session, line_numbers, use_ocr)
session.extracted_data = extracted
# Step 2: Map extracted values to form fields
mapped = self.mapper.map_values(extracted["line_values"])
session.mapped_fields = mapped
# Step 3: Fill the template PDF
output_filename = f"filled_{session.session_id}.pdf"
output_path = settings.output_dir / output_filename
self.filler.fill_form(
template_path=session.template_pdf,
output_path=output_path,
field_values=mapped,
flatten=flatten
)
session.output_pdf = output_path
session.status = "completed"
logger.info(f"Processing completed for session {session.session_id}")
return {
"status": "success",
"extracted_data": extracted,
"mapped_fields": mapped,
"output_filename": output_filename
}
except Exception as e:
session.status = "failed"
session.errors.append(str(e))
logger.error(f"Processing failed for session {session.session_id}: {e}")
raise
def _extract_data(
self,
session: ProcessingSession,
line_numbers: Optional[list[str]] = None,
use_ocr: Optional[bool] = None
) -> dict:
"""Extract data from source PDF."""
# First try text extraction
has_text = self.extractor.has_text_content(session.source_pdf)
if has_text and not use_ocr:
# Use text extraction
result = self.extractor.extract_all_data(session.source_pdf, line_numbers)
result["extraction_method"] = "text"
elif self.ocr_available:
# Use OCR
logger.info("Using OCR for extraction")
text = self.ocr.process_pdf(session.source_pdf)
line_values = self.extractor.extract_line_values(text, line_numbers)
result = {
"text": text,
"line_values": line_values,
"has_text": False,
"extraction_method": "ocr"
}
else:
# No text and no OCR available
session.warnings.append("PDF appears to be scanned but OCR is not available")
result = self.extractor.extract_all_data(session.source_pdf, line_numbers)
result["extraction_method"] = "text_fallback"
# Add text preview (first 500 chars)
if result.get("text"):
result["text_preview"] = result["text"][:500]
return result
def get_output_path(self, session: ProcessingSession) -> Optional[Path]:
"""Get the output PDF path for a session."""
if session.output_pdf and session.output_pdf.exists():
return session.output_pdf
return None
def get_template_fields(self, session: ProcessingSession) -> dict:
"""Get form fields from the template PDF."""
if not session.template_pdf:
raise FileValidationError("No template PDF uploaded")
return self.filler.get_form_fields(session.template_pdf)
def update_mapping(self, line_number: str, field_name: str, description: str = "") -> None:
"""Update field mapping."""
self.mapper.add_mapping(line_number, field_name, description)
def get_all_mappings(self) -> dict:
"""Get all current mappings."""
return self.mapper.get_all_mappings()
def save_mapping_config(self, path: Optional[Path] = None) -> None:
"""Save current mappings to config file."""
if path is None:
path = settings.config_dir / settings.default_mapping_file
self.mapper.save_config(path)
def cleanup_session(self, session_id: str) -> None:
"""Clean up session files."""
if session_id in self.sessions:
session = self.sessions[session_id]
if session.session_dir.exists():
shutil.rmtree(session.session_dir)
del self.sessions[session_id]
logger.info(f"Cleaned up session: {session_id}")
# Global service instance
processor_service = PDFProcessorService()