Docgenie-API / api /utils.py
Ahadhassan-2003
deploy: update HF Space
5c36ec7
"""
Core processing utilities for DocGenie document generation pipeline.
Integrated functionality (All 19 Stages):
- Stage 1-2: Seed selection, LLM prompting, response processing, PDF rendering, bbox extraction
- Stage 3: Handwriting & visual element synthesis (WordStylist diffusion, stamps, barcodes, logos)
- Stage 4: Image finalization & OCR (pdf2image, Microsoft Document Intelligence)
- Stage 5: Dataset packaging (bbox normalization, GT verification, analysis, debug viz)
References generationfolder for core pipeline logic.
"""
import asyncio
import base64
import json
import pathlib
import tempfile
import time
import uuid
import re
from typing import List, Tuple, Optional, Dict, Any
from io import BytesIO
import requests
import httpx
from PIL import Image
from pdf2image import convert_from_path
from bs4 import BeautifulSoup
from playwright.async_api import async_playwright
import fitz # PyMuPDF for PDF processing
from docgenie.generation.constants import BS_PARSER, HANDWRITING_CLASS_NAME, VISUAL_ELEMENT_TYPE_SYNONYMS
from docgenie.generation.pipeline_01.claude_batching import ClaudeBatchedClient, create_message
from docgenie.generation.pipeline_03_process_response import (
extract_html_documents_from_text,
extract_gt,
)
from docgenie.generation.pipeline_03.css import (
increase_handwriting_font_size,
unmark_visual_elements,
)
from docgenie.generation.pipeline_04_render_pdf_and_extract_geos import (
render_pdf_async,
preprocess_html_for_pdf,
)
from docgenie.generation.pipeline_04.extract_bbox import extract_bboxes_from_pdf
# Stage 3 imports - we implement simplified versions directly in this file
# The full pipeline functions are available but require SynDatasetDefinition
# For API use, we extract elements directly from HTML/CSS
from docgenie.generation.utils.pdfjs import MEASURE_DIMENSIONS
from docgenie.generation.utils.stamp import create_stamp
from docgenie import ENV
# Import config for handwriting service URL
from .config import settings
async def download_image_to_base64(url: str) -> str:
"""
Download image or PDF from URL and convert to base64 JPEG.
If URL points to a PDF, converts the first page to an image.
Args:
url: Image or PDF URL
Returns:
Base64-encoded JPEG image string
"""
response = requests.get(url, timeout=30)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '').lower()
is_pdf = 'application/pdf' in content_type or url.lower().endswith('.pdf')
if is_pdf:
# Handle PDF: convert first page to image
print(f" 📄 Detected PDF, converting first page to image: {url[:80]}...")
# Load PDF from bytes
pdf_document = fitz.open(stream=response.content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
# Render first page to image at high DPI
page = pdf_document[0]
# Use 300 DPI for high quality (matrix zoom factor = DPI/72)
zoom = 300 / 72
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert pixmap to PIL Image
img_data = pix.tobytes("png")
img = Image.open(BytesIO(img_data))
pdf_document.close()
print(f" ✓ Converted PDF to image: {img.size[0]}x{img.size[1]}px")
else:
# Handle regular image
img = Image.open(BytesIO(response.content))
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
# Save as JPEG in memory
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=95)
buffer.seek(0)
# Encode to base64
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
return img_base64
def download_seed_images(urls: List[str]) -> List[str]:
"""
Download multiple seed images/PDFs and convert to base64 (synchronous version for worker).
If a URL points to a PDF, converts the first page to an image.
Implements retry logic for transient HTTP errors (503, 502, 504, 429).
Args:
urls: List of image or PDF URLs
Returns:
List of base64-encoded JPEG image strings
"""
images = []
for url in urls:
# Retry logic for transient HTTP errors
max_retries = 3
response = None
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
break # Success, exit retry loop
except requests.exceptions.HTTPError as e:
# Retry on transient server errors
if e.response.status_code in [502, 503, 504, 429]:
if attempt < max_retries - 1:
wait_time = 2 * (2 ** attempt) # Exponential backoff: 2s, 4s, 8s
print(f" ⚠️ HTTP {e.response.status_code} error downloading seed image, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})...")
time.sleep(wait_time)
continue
# Non-retryable error or last attempt
raise
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
if attempt < max_retries - 1:
wait_time = 2 * (2 ** attempt)
print(f" ⚠️ Network error downloading seed image, retrying in {wait_time}s (attempt {attempt + 1}/{max_retries}): {e}")
time.sleep(wait_time)
continue
raise
if response is None:
raise Exception(f"Failed to download seed image after {max_retries} attempts")
content_type = response.headers.get('Content-Type', '').lower()
is_pdf = 'application/pdf' in content_type or url.lower().endswith('.pdf')
if is_pdf:
# Handle PDF: convert first page to image
print(f" 📄 Detected PDF, converting first page to image: {url[:80]}...")
# Load PDF from bytes
pdf_document = fitz.open(stream=response.content, filetype="pdf")
if len(pdf_document) == 0:
raise ValueError("PDF has no pages")
# Render first page to image at high DPI
page = pdf_document[0]
# Use 300 DPI for high quality (matrix zoom factor = DPI/72)
zoom = 300 / 72
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
# Convert pixmap to PIL Image
img_data = pix.tobytes("png")
img = Image.open(BytesIO(img_data))
pdf_document.close()
print(f" ✓ Converted PDF to image: {img.size[0]}x{img.size[1]}px")
else:
# Handle regular image
img = Image.open(BytesIO(response.content))
# Convert to RGB if necessary
if img.mode != 'RGB':
img = img.convert('RGB')
# Save as JPEG in memory
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=95)
buffer.seek(0)
# Encode to base64
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
images.append(img_base64)
return images
def build_prompt(
language: str,
doc_type: str,
gt_type: str,
gt_format: str,
num_solutions: int,
num_seed_images: int,
prompt_template_path: pathlib.Path,
enable_visual_elements: bool = True,
visual_element_types: List[str] = None
) -> str:
"""
Build the system prompt by injecting parameters into template.
Args:
language: Language for documents
doc_type: Type of documents
gt_type: Ground truth type description
gt_format: Ground truth format specification
num_solutions: Number of documents to generate
num_seed_images: Number of seed images provided
prompt_template_path: Path to prompt template file
enable_visual_elements: Whether to include visual element instructions
visual_element_types: List of allowed visual element types
Returns:
Formatted prompt string
"""
template = prompt_template_path.read_text(encoding='utf-8')
# Handle dynamic Visual Placeholders block
import re
# Define placeholder block pattern
ve_block_pattern = r"## Visual Placeholders \(if document type requires\)\n(.*?)\n\n"
if not enable_visual_elements or not visual_element_types:
# Remove the whole block
template = re.sub(ve_block_pattern, "", template, flags=re.DOTALL)
# Also remove the checklist item
template = template.replace("- [ ] Visual elements are semantically coherent\n", "")
else:
# Update the block with specific types
types_str = ", ".join(visual_element_types)
# Example mapping
EXAMPLES = {
"stamp": '- Example: `<div data-placeholder="stamp" data-content="APPROVED 2024-03-15" style="position:absolute;top:50mm;right:20mm;width:35mm;height:35mm;z-index:10;"></div>`',
"logo": '- Example: `<div data-placeholder="logo" data-content="ACME Corp Logo" style="width:150mm;height:100mm;"></div>`',
"figure": '- Example: `<div data-placeholder="figure" data-content="Sales Chart 2023" style="width:120mm;height:80mm;"></div>`',
"barcode": '- Example: `<div data-placeholder="barcode" data-content="SKU-12345678" style="width:60mm;height:25mm;"></div>`',
"photo": '- Example: `<div data-placeholder="photo" data-content="Customer Portrait" style="width:40mm;height:50mm;"></div>`'
}
# Select examples
selected_examples = []
for t in visual_element_types:
if t in EXAMPLES:
selected_examples.append(EXAMPLES[t])
if len(selected_examples) >= 2:
break
# Fallback if somehow no types matched (shouldn't happen with valid types)
if len(selected_examples) == 0:
selected_examples = [EXAMPLES["logo"], EXAMPLES["stamp"]]
new_block = [
"## Visual Placeholders (if document type requires)",
"- Insert `<div data-placeholder=\"type\" style=\"...\">` for non-text elements at appropriate positions",
f"- Valid types are: {types_str}",
"- Add data-content attribute with actual content description",
"- For stamps, use `position:absolute;z-index:10;` and specify 'top' and 'right'" if "stamp" in visual_element_types else None,
"- Always provide appropiate dimensions",
]
# Add the selected examples (either 1 or 2)
new_block.extend(selected_examples)
# Filter out None and join
new_block_str = "\n".join([line for line in new_block if line is not None]) + "\n\n"
template = re.sub(ve_block_pattern, new_block_str, template, flags=re.DOTALL)
# Inject parameters into template
prompt = template.format(
language=language,
doc_type=doc_type,
gt_type=gt_type,
gt_format=gt_format,
num_solutions=num_solutions,
num_seed_images=num_seed_images
)
return prompt
async def call_claude_api_direct(
prompt: str,
seed_images_base64: List[str],
api_key: str,
model: str = "claude-sonnet-4-5-20250929",
max_tokens: int = 16384
) -> str:
"""
Call Claude API directly (non-batched) with prompt and seed images.
Used for API endpoint for immediate synchronous responses.
Args:
prompt: System prompt
seed_images_base64: List of base64-encoded seed images
api_key: Anthropic API key
model: Claude model name
max_tokens: Maximum tokens for response
Returns:
Raw LLM response text
"""
import anthropic
client = anthropic.Anthropic(api_key=api_key)
# Build message using the same format as batched client
message_content = create_message(prompt=prompt, images_base64=seed_images_base64)
# Call API with prompt caching enabled
message = client.messages.create(
model=model,
max_tokens=max_tokens,
messages=[message_content],
)
# Extract text response
response_text = ""
for block in message.content:
if block.type == "text":
response_text += block.text
return response_text
def extract_html_documents_from_response(response_text: str) -> List[str]:
"""
Extract individual HTML documents from LLM response.
Uses pipeline_03 function for consistency.
Args:
response_text: Raw LLM response
Returns:
List of HTML document strings
"""
# Use the pipeline function for HTML extraction
return extract_html_documents_from_text(text=response_text)
def extract_ground_truth(html: str) -> Tuple[Optional[dict], str]:
"""
Extract ground truth JSON from HTML and return cleaned HTML.
Uses pipeline_03 function for consistency.
Args:
html: HTML document with embedded GT
Returns:
Tuple of (ground_truth_dict, html_without_gt)
"""
# Use the pipeline function
raw_json, html_clean, soup = extract_gt(html=html)
if raw_json:
try:
gt_dict = json.loads(raw_json)
return gt_dict, html_clean
except json.JSONDecodeError:
return None, html
return None, html
def extract_css_from_html(html: str) -> Tuple[str, str]:
"""
Extract CSS from HTML and return both separately.
Args:
html: HTML document
Returns:
Tuple of (css_string, html_string)
"""
soup = BeautifulSoup(html, BS_PARSER)
css_parts = []
# Extract from <style> tags
for style_tag in soup.find_all("style"):
if style_tag.string:
css_parts.append(style_tag.string)
# Extract inline styles (optional - for completeness)
for tag in soup.find_all(style=True):
css_parts.append(f"{tag.name} {{ {tag['style']} }}")
css = "\n".join(css_parts)
return css, html
# preprocess_html_for_pdf is now imported from pipeline_04_render_pdf_and_extract_geos
async def render_html_to_pdf(
html: str,
output_pdf_path: pathlib.Path,
timeout_seconds: int = 60
) -> Tuple[pathlib.Path, float, float, List[dict]]:
"""
Render HTML to PDF using Playwright with automatic size detection.
Also extracts element geometries for handwriting and visual elements.
Matches pipeline_04 rendering logic.
Args:
html: HTML content to render
output_pdf_path: Path where PDF should be saved
timeout_seconds: Timeout for rendering
Returns:
Tuple of (pdf_path, width_mm, height_mm, geometries)
- geometries: List of dicts with element positions, classes, and metadata
"""
# Preprocess HTML using pipeline function
html = preprocess_html_for_pdf(html)
soup = BeautifulSoup(html, BS_PARSER)
# Apply handwriting and visual element processing
soup = increase_handwriting_font_size(soup, dbg=False)
soup = unmark_visual_elements(soup)
prep_html = soup.prettify()
# Create temporary HTML file
with tempfile.NamedTemporaryFile(
mode='w',
suffix='.html',
delete=False,
encoding='utf-8'
) as tmp_html:
tmp_html.write(prep_html)
tmp_html_path = tmp_html.name
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# Load HTML
await page.goto(
f"file://{tmp_html_path}",
wait_until="domcontentloaded"
)
await page.emulate_media(media="screen")
# Auto-detect dimensions
dimensions = await page.evaluate(MEASURE_DIMENSIONS)
page_width_px = dimensions["width"]
page_height_px = dimensions["height"]
# Set viewport
await page.set_viewport_size({
"width": page_width_px,
"height": page_height_px
})
await page.wait_for_timeout(30)
# Extract geometries BEFORE generating PDF (matches pipeline_04)
# Define selectors for handwriting and visual elements
selector_map = {
"handwriting": ".handwritten",
"visual_element": "[data-placeholder]",
"layout_element": r'[class*="LE-"]'
}
# Use json.dumps to properly escape quotes in selectors
import json
selector_map_js = json.dumps(selector_map)
# JavaScript geometry extraction (from pipeline_04)
geo_eval_script = f"""
() => {{
const data = [];
const selectorMap = {selector_map_js};
const processedElements = new Map();
// First pass: collect all elements and their matching selectors
Object.entries(selectorMap).forEach(([label, selector]) => {{
document.querySelectorAll(selector).forEach(el => {{
if (!processedElements.has(el)) {{
processedElements.set(el, []);
}}
processedElements.get(el).push(label);
}});
}});
// Second pass: create geometry data for each unique element
processedElements.forEach((selectorTypes, el) => {{
const rect = el.getBoundingClientRect();
const computed = window.getComputedStyle(el);
// Get text content
let text = '';
if (el.tagName.toLowerCase() === 'input') {{
text = (el.value || '').trim();
}} else {{
text = (el.innerText || el.textContent || '').trim();
}}
data.push({{
id: el.id || null,
tag: el.tagName.toLowerCase(),
classes: el.className || null,
rect: {{
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height
}},
visibility: computed.visibility,
dataContent: el.getAttribute('data-content') || null,
dataPlaceholder: el.getAttribute('data-placeholder') || null,
style: el.getAttribute('style') || null,
text: text,
selectorTypes: selectorTypes
}});
}});
return data;
}}
"""
geometries = await page.evaluate(geo_eval_script)
print(f" 🔍 Extracted {len(geometries)} geometries from rendered DOM")
# Debug: Show what was found
hw_geos = [g for g in geometries if "handwriting" in g.get("selectorTypes", [])]
ve_geos = [g for g in geometries if "visual_element" in g.get("selectorTypes", [])]
if hw_geos:
print(f" - Found {len(hw_geos)} handwriting elements in DOM")
if ve_geos:
print(f" - Found {len(ve_geos)} visual element placeholders in DOM")
if not hw_geos and not ve_geos:
print(f" - ⚠️ No handwriting or visual elements found in DOM")
# Generate PDF
page_width_inches = page_width_px / 96
page_height_inches = page_height_px / 96
await page.pdf(
path=str(output_pdf_path),
width=f"{page_width_inches}in",
height=f"{page_height_inches}in",
margin={
"top": "0",
"bottom": "0",
"left": "0",
"right": "0"
},
print_background=True,
display_header_footer=False,
prefer_css_page_size=False,
scale=1.0
)
await browser.close()
# Convert to mm
width_mm = page_width_inches * 25.4
height_mm = page_height_inches * 25.4
return output_pdf_path, width_mm, height_mm, geometries
finally:
# Clean up temp file
pathlib.Path(tmp_html_path).unlink(missing_ok=True)
def extract_bboxes_from_rendered_pdf(
pdf_path: pathlib.Path
) -> List[dict]:
"""
Extract bounding boxes from rendered PDF.
Args:
pdf_path: Path to PDF file
Returns:
List of bounding box dictionaries
"""
from docgenie.generation.models import OCRBox
# Extract word-level bboxes
word_bboxes = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="word"
)
# Convert OCRBox objects to dict format
# OCRBox has: x0, y0, x2, y2, text, block_no, line_no, word_no
bbox_list = []
for bbox in word_bboxes:
bbox_list.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width, # x2 - x0
"height": bbox.height, # y2 - y0
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0 # Single page documents only
})
return bbox_list
def pdf_to_base64(pdf_path: pathlib.Path) -> str:
"""
Convert PDF file to base64 string.
Args:
pdf_path: Path to PDF file
Returns:
Base64-encoded PDF
"""
with open(pdf_path, 'rb') as f:
pdf_bytes = f.read()
return base64.b64encode(pdf_bytes).decode('utf-8')
def validate_html_structure(html: str) -> Tuple[bool, str]:
"""
Validate HTML structure (pipeline_06 style validation).
Args:
html: HTML content to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
soup = BeautifulSoup(html, BS_PARSER)
# Check for required tags
if not soup.find('html'):
return False, "Missing <html> tag"
if not soup.find('head'):
return False, "Missing <head> tag"
if not soup.find('body'):
return False, "Missing <body> tag"
# Check for minimum content
body = soup.find('body')
if body and len(body.get_text(strip=True)) < 10:
return False, "Body content too short"
return True, ""
except Exception as e:
return False, f"HTML parsing error: {str(e)}"
def validate_pdf(pdf_path: pathlib.Path) -> Tuple[bool, str]:
"""
Validate PDF file (pipeline_06 style validation).
Args:
pdf_path: Path to PDF file
Returns:
Tuple of (is_valid, error_message)
"""
try:
from PyPDF2 import PdfReader
if not pdf_path.exists():
return False, "PDF file does not exist"
# Check file size
file_size = pdf_path.stat().st_size
if file_size == 0:
return False, "PDF file is empty"
if file_size > 50 * 1024 * 1024: # 50MB limit
return False, f"PDF file too large: {file_size / (1024*1024):.1f}MB"
# Check page count
with open(pdf_path, 'rb') as f:
reader = PdfReader(f)
num_pages = len(reader.pages)
if num_pages == 0:
return False, "PDF has no pages"
if num_pages > 1:
return False, f"PDF has {num_pages} pages (expected 1)"
return True, ""
except Exception as e:
return False, f"PDF validation error: {str(e)}"
def validate_bboxes(bboxes: List[dict], min_bbox_count: int = 0) -> Tuple[bool, str]:
"""
Validate bounding boxes (pipeline_06 style validation).
Args:
bboxes: List of bounding box dictionaries
min_bbox_count: Minimum number of bboxes required
Returns:
Tuple of (is_valid, error_message)
"""
if len(bboxes) < min_bbox_count:
return False, f"Only {len(bboxes)} bboxes found (minimum {min_bbox_count} required)"
for i, bbox in enumerate(bboxes):
# Check required fields
required_fields = ['text', 'x', 'y', 'width', 'height']
for field in required_fields:
if field not in bbox:
return False, f"BBox {i} missing required field: {field}"
# Check dimensions
if bbox['width'] <= 0 or bbox['height'] <= 0:
return False, f"BBox {i} has invalid dimensions: {bbox['width']}x{bbox['height']}"
return True, ""
def validate_html_structure(html: str) -> Tuple[bool, Optional[str]]:
"""
Validate HTML structure for common issues.
Args:
html: HTML content to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
soup = BeautifulSoup(html, BS_PARSER)
# Check for basic HTML structure
if not soup.find('html'):
return False, "Missing <html> tag"
if not soup.find('head'):
return False, "Missing <head> tag"
if not soup.find('body'):
return False, "Missing <body> tag"
return True, None
except Exception as e:
return False, f"HTML parsing error: {str(e)}"
def validate_pdf(pdf_path: pathlib.Path) -> Tuple[bool, Optional[str]]:
"""
Validate PDF file for common issues.
Args:
pdf_path: Path to PDF file
Returns:
Tuple of (is_valid, error_message)
"""
try:
from PyPDF2 import PdfReader
if not pdf_path.exists():
return False, "PDF file does not exist"
if pdf_path.stat().st_size == 0:
return False, "PDF file is empty"
# Try to open and read PDF
with open(pdf_path, 'rb') as f:
reader = PdfReader(f)
num_pages = len(reader.pages)
if num_pages == 0:
return False, "PDF has no pages"
if num_pages > 1:
return False, f"PDF has {num_pages} pages (expected 1)"
return True, None
except Exception as e:
return False, f"PDF validation error: {str(e)}"
def validate_bboxes(bboxes: List[dict], min_bbox_count: int = 1) -> Tuple[bool, Optional[str]]:
"""
Validate bounding boxes for common issues.
Args:
bboxes: List of bounding box dictionaries
min_bbox_count: Minimum expected number of bboxes
Returns:
Tuple of (is_valid, error_message)
"""
if len(bboxes) < min_bbox_count:
return False, f"Too few bboxes: {len(bboxes)} (expected at least {min_bbox_count})"
for i, bbox in enumerate(bboxes):
# Check required fields
required_fields = ['text', 'x', 'y', 'width', 'height']
for field in required_fields:
if field not in bbox:
return False, f"BBox {i} missing required field: {field}"
# Check for valid dimensions
if bbox['width'] <= 0 or bbox['height'] <= 0:
return False, f"BBox {i} has invalid dimensions: width={bbox['width']}, height={bbox['height']}"
return True, None
# ============================================================================
# STAGE 3: Feature Synthesis (Handwriting & Visual Elements)
# ============================================================================
async def call_handwriting_service_batch(
texts_with_metadata: List[Dict]
) -> List[Dict]:
"""
Call RunPod handwriting service with TRUE batch processing for cost efficiency.
Sends all texts in ONE request to activate only ONE worker, significantly reducing costs.
Cost comparison for 10 texts:
- OLD (parallel): 10 workers × 18s = 180 worker-seconds
- NEW (batched): 1 worker × 190s = 190 worker-seconds BUT only 1 worker activation fee
For RunPod pricing with activation overhead, batching is ~40-60% cheaper.
Args:
texts_with_metadata: List of dicts with keys: text, author_id, hw_id
Returns:
List of dicts with keys: hw_id, image_base64, text, author_id, width, height
"""
if not texts_with_metadata:
return []
max_retries = settings.HANDWRITING_SERVICE_MAX_RETRIES
timeout = settings.HANDWRITING_SERVICE_TIMEOUT
# Calculate appropriate timeout: ~18s per text + 30s buffer
# For large batches, increase timeout proportionally
num_texts = len(texts_with_metadata)
batch_timeout = max(timeout, num_texts * 20 + 30) # 20s per text + buffer
# Prepare headers
headers = {"Content-Type": "application/json"}
if settings.RUNPOD_API_KEY:
headers["Authorization"] = f"Bearer {settings.RUNPOD_API_KEY}"
print(f" Processing {num_texts} texts in ONE batch (1 worker activation)...")
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=batch_timeout) as client:
# Build RunPod BATCH request format
runpod_request = {
"input": {
"texts": [
{
"text": item["text"],
"author_id": item["author_id"],
"hw_id": item.get("hw_id", f"hw_{i}")
}
for i, item in enumerate(texts_with_metadata)
],
"apply_blur": settings.HANDWRITING_APPLY_BLUR
}
}
response = await client.post(
settings.HANDWRITING_SERVICE_URL,
json=runpod_request,
headers=headers
)
response.raise_for_status()
result = response.json()
# Parse RunPod response format
# Handle cases where /runsync returns before completion
job_status = result.get("status")
if job_status == "IN_PROGRESS":
# RunPod's /runsync can return IN_PROGRESS for long jobs
# Poll the status endpoint until completion
job_id = result.get("id")
if not job_id:
raise Exception("RunPod job IN_PROGRESS but no job ID provided")
print(f" ⏳ Job {job_id} still processing, polling status...")
# Extract base URL and construct status endpoint
# URL format: https://api.runpod.ai/v2/{endpoint_id}/runsync
# Status format: https://api.runpod.ai/v2/{endpoint_id}/status/{job_id}
base_url = settings.HANDWRITING_SERVICE_URL.replace("/runsync", "")
status_url = f"{base_url}/status/{job_id}"
# Poll with exponential backoff
max_polls = 30 # Max 30 polls
poll_delay = 5 # Start with 5 seconds
for poll_attempt in range(max_polls):
await asyncio.sleep(poll_delay)
status_response = await client.get(status_url, headers=headers)
status_response.raise_for_status()
result = status_response.json()
job_status = result.get("status")
print(f" ⏳ Poll {poll_attempt + 1}/{max_polls}: {job_status}")
if job_status == "COMPLETED":
print(f" ✅ Job completed after {poll_attempt + 1} polls")
break
elif job_status == "FAILED":
raise Exception(f"RunPod job failed: {result.get('error', 'Unknown error')}")
elif job_status not in ["IN_PROGRESS", "IN_QUEUE"]:
raise Exception(f"Unknown job status: {job_status}")
# Increase delay slightly (cap at 10s)
poll_delay = min(poll_delay + 1, 10)
else:
raise Exception(f"Job did not complete after {max_polls} status checks")
if job_status != "COMPLETED":
raise Exception(f"RunPod job not completed: {job_status}")
output = result.get("output", {})
if "error" in output:
raise Exception(f"RunPod error: {output['error']}")
# Extract images from batch response
images = output.get("images", [])
if not images:
raise Exception("No images in batch response")
# Format results
all_results = [
{
"hw_id": img.get("hw_id"),
"text": img.get("text"),
"author_id": img.get("author_id"),
"image_base64": img.get("image_base64"),
"width": img.get("width"),
"height": img.get("height")
}
for img in images
]
print(f" → Batch complete: {len(all_results)}/{num_texts} texts generated successfully")
return all_results
except httpx.TimeoutException as e:
if attempt < max_retries - 1:
wait_time = 10 * (attempt + 1) # Exponential backoff
print(f" ⚠️ Timeout on attempt {attempt + 1}/{max_retries}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
print(f" ❌ Batch failed after {max_retries} retries: {e}")
return []
except Exception as e:
if attempt < max_retries - 1:
wait_time = 5 * (attempt + 1)
print(f" ⚠️ Error on attempt {attempt + 1}/{max_retries}: {e}, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
continue
else:
print(f" ❌ Batch failed: {e}")
return []
return []
async def generate_visual_element_images(
visual_elements: list[dict],
seed: Optional[int] = None,
assets_dir: Optional[pathlib.Path] = None
) -> dict:
"""
Generate visual element images (stamps, logos, barcodes, photos, figures).
Args:
visual_elements: List of visual element definitions with type, content, rect
seed: Random seed for reproducible selection (default: None)
Returns:
Dict {ve_id: base64_png} of generated images
"""
import random
import base64
import io
from pathlib import Path
if seed is not None:
random.seed(seed)
visual_element_images = {}
# Cache prefab directories
logo_prefabs = None
photo_prefabs = None
figure_prefabs = None
def get_logo_prefabs():
nonlocal logo_prefabs
if logo_prefabs is None:
logo_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "logo"
logo_prefabs = list(logo_dir.glob("*.png")) + list(logo_dir.glob("*.jpg"))
return logo_prefabs
def get_photo_prefabs():
nonlocal photo_prefabs
if photo_prefabs is None:
photo_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "photo"
photo_prefabs = list(photo_dir.glob("*.png")) + list(photo_dir.glob("*.jpg"))
return photo_prefabs
def get_figure_prefabs():
nonlocal figure_prefabs
if figure_prefabs is None:
figure_dir = ENV.VISUAL_ELEMENT_PREFABS_DIR / "figure"
figure_prefabs = list(figure_dir.glob("*.png")) + list(figure_dir.glob("*.jpg"))
return figure_prefabs
for ve in visual_elements:
ve_id = ve.get('id', 'unknown')
ve_type = ve.get('type', 'unknown')
content = ve.get('content', '')
rect = ve.get('rect', {})
width = rect.get('width', 100)
height = rect.get('height', 100)
rotation = ve.get('rotation', 0)
try:
img = None
if ve_type == 'stamp':
# Select stamp: from assets_dir if available, else generate
if assets_dir:
stamp_files = list(assets_dir.glob("stamp_*"))
if stamp_files:
selected_stamp = random.choice(stamp_files)
img = Image.open(selected_stamp).convert("RGBA")
if not img: # Fallback to generation
img = create_stamp(
text=content if content else "STAMP",
width=width,
height=height,
rot_angle=None # Rotation applied during insertion
)
elif ve_type == 'logo':
# Select logo: from assets_dir if available, else from prefabs
if assets_dir:
logo_files = list(assets_dir.glob("logo_*"))
if logo_files:
selected_logo = random.choice(logo_files)
img = Image.open(selected_logo).convert("RGBA")
if not img: # Fallback to prefabs
logos = get_logo_prefabs()
if logos:
selected_logo = random.choice(logos)
img = Image.open(selected_logo).convert("RGBA")
elif ve_type == 'barcode':
# Generate Code128 barcode
try:
from barcode import Code128
from barcode.writer import ImageWriter
# Validate barcode content
barcode_content = content.strip() if content and content.strip().isdigit() else str(random.randint(100000000000, 999999999999))
# Configure barcode writer
writer = ImageWriter()
writer.set_options({
"module_width": 0.3,
"module_height": 15.0,
"quiet_zone": 6.5,
"font_size": 7,
"text_distance": 5,
"background": "rgba(255, 255, 255, 0)",
"foreground": "black",
})
code128 = Code128(barcode_content, writer=writer)
buffer = io.BytesIO()
code128.write(buffer, options={"format": "PNG"})
buffer.seek(0)
img = Image.open(buffer).convert("RGBA")
except ImportError:
print(f" ⚠ 'python-barcode' not installed, skipping barcode {ve_id}")
except Exception as e:
print(f" ⚠ Barcode generation failed for {ve_id}: {e}")
elif ve_type == 'photo':
# Select photo: from assets_dir if available, else from prefabs
if assets_dir:
photo_files = list(assets_dir.glob("photo_*"))
if photo_files:
selected_photo = random.choice(photo_files)
img = Image.open(selected_photo).convert("RGBA")
if not img: # Fallback to prefabs
photos = get_photo_prefabs()
if photos:
selected_photo = random.choice(photos)
img = Image.open(selected_photo).convert("RGBA")
elif ve_type in ['figure', 'chart', 'diagram']:
# Select figure: from assets_dir if available, else from prefabs
if assets_dir:
figure_files = list(assets_dir.glob("figure_*"))
if figure_files:
selected_figure = random.choice(figure_files)
img = Image.open(selected_figure).convert("RGBA")
if not img: # Fallback to prefabs
figures = get_figure_prefabs()
if figures:
selected_figure = random.choice(figures)
img = Image.open(selected_figure).convert("RGBA")
# Convert to base64 if successfully generated
if img:
buffer = io.BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_b64 = base64.b64encode(buffer.read()).decode('utf-8')
visual_element_images[ve_id] = img_b64
except Exception as e:
print(f" ⚠ Failed to generate visual element {ve_id} (type: {ve_type}): {e}")
continue
return visual_element_images
async def process_stage3_complete(
pdf_path: pathlib.Path,
geometries: list[dict],
ground_truth: dict,
bboxes_raw: list[dict],
page_width_mm: float,
page_height_mm: float,
enable_handwriting: bool = False,
handwriting_ratio: float = 0.5,
enable_visual_elements: bool = False,
visual_element_types: list[str] = None,
seed: Optional[int] = None,
assets_dir: Optional[pathlib.Path] = None
) -> tuple[str, list[dict], list[dict], dict, dict, pathlib.Path | None, pathlib.Path | None]:
"""
Process complete Stage 3 pipeline (stages 07-11) using browser-extracted geometries.
- Extract handwriting definitions from geometries (from DOM, not HTML parsing)
- Extract visual element definitions from geometries
- Generate handwriting images (via EC2 service if enabled)
- Create visual element images
- Render second-pass PDF with handwriting and visual elements
- Convert final PDF to base64 image
Args:
geometries: List of element geometries extracted from browser DOM
Returns:
tuple: (final_image_base64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path)
- final_image_base64: Base64 PNG of final document
- handwriting_regions: List of handwriting metadata dicts
- visual_elements: List of visual element metadata dicts
- handwriting_images: Dict {hw_id: base64_png} for individual tokens
- visual_element_images: Dict {ve_id: base64_png} for individual elements
- pdf_with_handwriting_path: Path to PDF after handwriting insertion (or None)
- pdf_final_path: Path to final PDF after all modifications (or None)
"""
import random
import base64
import fitz # PyMuPDF
handwriting_regions = []
visual_elements = []
print(f" 🔍 Processing {len(geometries)} geometries from DOM")
# Step 2: Extract handwriting definitions (pipeline_07) - map geometries to word bboxes
if enable_handwriting:
# Convert bboxes_raw dicts to OCRBox objects for matching
from docgenie.generation.models import OCRBox
from docgenie.generation.constants import BBOX_TO_GEO_MATCHING_THRESHOLD
from docgenie.generation.utils.bboxes import is_in_rect
# Build OCRBox list from bboxes_raw
word_bboxes = []
for bbox_dict in bboxes_raw:
word_bboxes.append(OCRBox(
x0=bbox_dict['x'],
y0=bbox_dict['y'],
x2=bbox_dict['x'] + bbox_dict['width'],
y2=bbox_dict['y'] + bbox_dict['height'],
text=bbox_dict['text'],
block_no=bbox_dict.get('block_no', 0), # Default if not present
line_no=bbox_dict.get('line_no', 0),
word_no=bbox_dict.get('word_no', 0)
))
# Filter geometries for handwriting elements
hw_geometries = [g for g in geometries if "handwriting" in g.get("selectorTypes", [])]
print(f" - Found {len(hw_geometries)} handwriting geometries")
taken_bbox_indices = set()
for i, geo in enumerate(hw_geometries):
classes_str = geo.get('classes', '')
classes = classes_str.split() if classes_str else []
# Extract author ID
other_classes = [c for c in classes if c != 'handwritten']
valid_author_ids = [c for c in other_classes if c.startswith("author")]
author_id = valid_author_ids[0] if valid_author_ids else None
# Random selection based on handwriting_ratio
if seed is not None:
random.seed(seed + i)
if random.random() > handwriting_ratio:
continue
text_content = geo.get('text', '').strip()
if not text_content:
continue
is_signature = 'signature' in classes
# Convert browser coordinates (96 DPI) to PDF points (72 DPI)
# Playwright renders at 96 DPI, PyMuPDF extracts at 72 DPI
# Conversion factor: 72/96 = 0.75
rect_browser = geo.get('rect', {})
dpi_scale = 72.0 / 96.0 # 0.75
rect = {
'x': rect_browser.get('x', 0) * dpi_scale,
'y': rect_browser.get('y', 0) * dpi_scale,
'width': rect_browser.get('width', 0) * dpi_scale,
'height': rect_browser.get('height', 0) * dpi_scale
}
# Map geometry to word bboxes (like pipeline_07 find_bbox_indices)
words = text_content.split()
n = len(words)
matched_bboxes = []
for j in range(len(word_bboxes) - n + 1):
slice_texts = [b.text for b in word_bboxes[j : j + n]]
if slice_texts == words:
start, stop = j, j + n
if (start, stop) not in taken_bbox_indices:
# Check if bboxes are within geometry rect
start_in_rect = is_in_rect(
rect=rect,
bbox=word_bboxes[start],
threshold=BBOX_TO_GEO_MATCHING_THRESHOLD
)
stop_in_rect = is_in_rect(
rect=rect,
bbox=word_bboxes[stop - 1],
threshold=BBOX_TO_GEO_MATCHING_THRESHOLD
)
if start_in_rect and stop_in_rect:
matched_bboxes = word_bboxes[start:stop]
taken_bbox_indices.add((start, stop))
break
if not matched_bboxes:
print(f" - ⚠️ No bbox match for hw{i}: '{text_content[:30]}'")
continue
handwriting_regions.append({
'id': f'hw{i}',
'text': text_content,
'author_id': author_id,
'is_signature': is_signature,
'rect': rect,
'bboxes': [b.as_string() for b in matched_bboxes],
'classes': classes_str
})
print(f" - Selected {len(handwriting_regions)} handwriting regions (ratio: {handwriting_ratio})")
# Step 3: Extract visual element definitions (pipeline_08) - from geometries
if enable_visual_elements:
# Filter geometries for visual element placeholders
ve_geometries = [g for g in geometries if "visual_element" in g.get("selectorTypes", [])]
print(f" - Found {len(ve_geometries)} visual element geometries")
for i, geo in enumerate(ve_geometries):
data_type = geo.get('dataPlaceholder', '')
data_content = geo.get('dataContent', '')
# Normalize type using synonyms (e.g., "chart" -> "figure")
normalized_type = VISUAL_ELEMENT_TYPE_SYNONYMS.get(data_type, data_type)
# Filter by requested types
if visual_element_types and normalized_type not in visual_element_types:
print(f" ⚠️ Filtered out visual element type '{data_type}' (normalized to '{normalized_type}', not in requested types: {visual_element_types})")
continue
# Use rect from geometry
rect_px = geo.get('rect', {})
px_to_mm = 25.4 / 96
rect = {
'x': rect_px.get('x', 0) * px_to_mm,
'y': rect_px.get('y', 0) * px_to_mm,
'width': rect_px.get('width', 0) * px_to_mm,
'height': rect_px.get('height', 0) * px_to_mm
}
# Extract rotation if present in style
rotation = 0
style = geo.get('style', '')
if style and 'rotate' in style:
rotation = extract_rotation_from_style(style)
visual_elements.append({
'id': f've{i}',
'type': normalized_type, # Use normalized type (e.g., "figure" not "chart")
'content': data_content,
'rect': rect,
'rotation': rotation
})
print(f" - Selected {len(visual_elements)} visual elements")
# Step 4: Generate handwriting images (pipeline_09)
handwriting_images = {}
# DEBUG: Show why handwriting service may not be called
print(f"\n 🔍 DEBUG - Handwriting Service Check:")
print(f" - enable_handwriting: {enable_handwriting}")
print(f" - handwriting_regions count: {len(handwriting_regions)}")
print(f" - HANDWRITING_SERVICE_ENABLED: {settings.HANDWRITING_SERVICE_ENABLED}")
print(f" - HANDWRITING_SERVICE_URL: {settings.HANDWRITING_SERVICE_URL}")
if enable_handwriting and handwriting_regions and settings.HANDWRITING_SERVICE_ENABLED:
print(f" ✅ Handwriting service check PASSED - preparing batch request...")
# Map author strings to numeric style IDs (matches original pipeline behavior)
# Original uses WRITER_STYLES list from constants.py
from docgenie.generation.constants import WRITER_STYLES
# Create deterministic mapping: author_id string → numeric style ID
def map_author_to_style_id(author_id_str: str, seed_val: Optional[int] = None) -> int:
"""
Map author ID string (like 'author1') to numeric style ID (0-656).
Matches original pipeline's style selection logic.
"""
if not author_id_str or not author_id_str.startswith('author'):
# Fallback: random from WRITER_STYLES
return random.choice(WRITER_STYLES)
try:
# Parse number from "authorN"
author_num = int(author_id_str.replace('author', ''))
# Use modulo to map to WRITER_STYLES indices
style_idx = author_num % len(WRITER_STYLES)
return WRITER_STYLES[style_idx]
except ValueError:
# If parsing fails, random selection
return random.choice(WRITER_STYLES)
# Prepare batch request for handwriting service
texts_to_generate = []
for i, hw_region in enumerate(handwriting_regions):
author_id_str = hw_region.get('author_id')
text = hw_region.get('text', '')
print(f" - Region {i+1}: author_id='{author_id_str}', text='{text[:30]}...'")
# Only generate if we have a valid author_id
if author_id_str is not None:
# Convert author string to numeric style ID
style_id = map_author_to_style_id(author_id_str, seed)
print(f" → Mapped to style_id={style_id}")
# Group bboxes by block/line (like pipeline_12)
bboxes_str = hw_region.get('bboxes', [])
if not bboxes_str:
print(f" → ⚠️ Skipped (no bboxes)")
continue
# Parse bbox strings and group by (block_no, line_no)
from collections import defaultdict
from docgenie.generation.utils.bboxes import read_syn_dataset_bbox_str
grouped_bboxes = defaultdict(list)
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
grouped_bboxes[(bbox.block_no, bbox.line_no)].append(bbox)
# Generate one image per word (WordStylist doesn't support spaces)
for (block_no, line_no), bbox_group in grouped_bboxes.items():
# Process each word individually
for word_idx, bbox in enumerate(bbox_group):
word_text = bbox.text
# Filter to only letters (WordStylist only supports A-Z, a-z, no spaces)
filtered_text = ''.join(c for c in word_text if c.isalpha())
# Skip if no valid text remains after filtering
if not filtered_text:
continue
texts_to_generate.append({
'text': filtered_text,
'author_id': style_id,
'hw_id': f"{hw_region['id']}_b{block_no}_l{line_no}_w{word_idx}"
})
print(f" → {len(grouped_bboxes)} block/line groups")
else:
print(f" → ⚠️ Skipped (no author_id)")
print(f" - Prepared {len(texts_to_generate)} texts for generation")
if texts_to_generate:
try:
print(f" - Calling RunPod handwriting service at {settings.HANDWRITING_SERVICE_URL}...")
# Call RunPod handwriting service
results = await call_handwriting_service_batch(texts_to_generate)
print(f" - ✅ Received {len(results)} handwriting images")
# Store generated images
for result in results:
handwriting_images[result['hw_id']] = result['image_base64']
except Exception as e:
print(f" - ❌ Handwriting service call failed: {e}")
import traceback
traceback.print_exc()
# If handwriting is explicitly enabled, fail the entire generation
# Don't produce documents without handwriting when user requested it
raise Exception(f"Handwriting generation failed: {e}") from e
else:
print(f" - ⚠️ No texts to generate (all regions missing author_id)")
else:
print(f" ❌ Handwriting service check FAILED - skipping generation")
# Step 5: Create visual element images (pipeline_10)
visual_element_images = {}
if enable_visual_elements and visual_elements:
try:
visual_element_images = await generate_visual_element_images(
visual_elements,
seed=seed,
assets_dir=assets_dir
)
print(f" ✓ Generated {len(visual_element_images)} visual element images")
except Exception as e:
print(f" ⚠ Visual element generation failed: {e}")
# Continue without visual elements
# Step 6: Insert handwriting images into PDF (pipeline_12)
doc = fitz.open(pdf_path)
page = doc[0]
pdf_with_handwriting_path = None
pdf_final_path = None
if handwriting_images:
print(f" 🖊️ Inserting {len(handwriting_images)} handwriting images into PDF...")
from docgenie.generation.constants import (
FIXED_HANDWRITING_X_OFFSET,
MAX_HANDWRITING_RAND_X_OFFSET_LEFT,
MAX_HANDWRITING_RAND_X_OFFSET_RIGHT,
MAX_HANDWRITING_RAND_Y_OFFSET_UP,
MAX_HANDWRITING_RAND_Y_OFFSET_DOWN,
PIPELINE_04_3_SCALE_UP_FACTOR
)
scale_up = PIPELINE_04_3_SCALE_UP_FACTOR # 3x upscaling
from docgenie.generation.utils.bboxes import read_syn_dataset_bbox_str
# Step 6a: White out original text in handwriting regions (matches pipeline_11)
# This replaces the "make text transparent" step from original pipeline
print(f" - Whitening out original text regions...")
for hw_region in handwriting_regions:
bboxes_str = hw_region.get('bboxes', [])
if not bboxes_str:
continue
# Draw white rectangles over each word bbox
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
# Draw white filled rectangle to hide original text
rect = fitz.Rect(bbox.x0, bbox.y0, bbox.x2, bbox.y2)
page.draw_rect(rect, color=(1, 1, 1), fill=(1, 1, 1))
print(f" - Inserting handwriting images...")
# Process each handwriting region
for hw_region in handwriting_regions:
hw_id = hw_region['id']
rect = hw_region['rect']
bboxes_str = hw_region.get('bboxes', [])
if not bboxes_str:
continue
# Parse bboxes and group by block/line
from collections import defaultdict
grouped_bboxes = defaultdict(list)
for bbox_str in bboxes_str:
bbox = read_syn_dataset_bbox_str(bbox_str)
grouped_bboxes[(bbox.block_no, bbox.line_no)].append(bbox)
# Insert images for each individual word
for (block_no, line_no), bbox_group in grouped_bboxes.items():
for word_idx, bbox in enumerate(bbox_group):
img_id = f"{hw_id}_b{block_no}_l{line_no}_w{word_idx}"
if img_id not in handwriting_images:
continue
try:
# Decode base64 image
img_data = base64.b64decode(handwriting_images[img_id])
img = Image.open(BytesIO(img_data))
# Get bbox dimensions for this word
bbox_w = bbox.x2 - bbox.x0
bbox_h = bbox.y2 - bbox.y0
# Resize with aspect ratio preservation (matches pipeline_12)
iw, ih = img.size
scale = min(bbox_w / iw, bbox_h / ih)
new_w = int(iw * scale * scale_up)
new_h = int(ih * scale * scale_up)
img_resized = img.resize((new_w, new_h), Image.Resampling.LANCZOS).convert("RGBA")
# Convert to bytes for PyMuPDF
img_bytes_io = BytesIO()
img_resized.save(img_bytes_io, format="PNG")
img_bytes = img_bytes_io.getvalue()
# Calculate position with random offsets
y_padding = 50
offset_x = random.randint(
-MAX_HANDWRITING_RAND_X_OFFSET_LEFT,
MAX_HANDWRITING_RAND_X_OFFSET_RIGHT
) + FIXED_HANDWRITING_X_OFFSET
offset_y = random.randint(
-MAX_HANDWRITING_RAND_Y_OFFSET_UP,
MAX_HANDWRITING_RAND_Y_OFFSET_DOWN
)
# Position at word bbox location
x0_pos = bbox.x0 + offset_x
y0_pos = bbox.y0 + offset_y - y_padding
x2_pos = min(x0_pos + img_resized.width / scale_up, bbox.x2) + offset_x
y2_pos = min(y0_pos + img_resized.height / scale_up, bbox.y2) + offset_y + 2 * y_padding
# Insert image into PDF
rect_fitz = fitz.Rect(x0_pos, y0_pos, x2_pos, y2_pos)
page.insert_image(rect_fitz, stream=img_bytes)
print(f" - ✓ Inserted {img_id} at ({x0_pos:.1f}, {y0_pos:.1f})")
except Exception as e:
print(f" - ⚠️ Failed to insert {img_id}: {e}")
import traceback
traceback.print_exc()
print(f" ✓ Handwriting insertion complete")
# Save modified PDF with handwriting (matches pipeline_12)
pdf_with_handwriting_path = pdf_path.parent / f"{pdf_path.stem}_with_handwriting.pdf"
doc.save(pdf_with_handwriting_path)
print(f" - Saved PDF with handwriting: {pdf_with_handwriting_path.name}")
doc.close()
# Reopen modified PDF for visual element insertion
doc = fitz.open(pdf_with_handwriting_path)
page = doc[0]
# Step 6b: Insert visual elements into PDF (pipeline_13)
if visual_element_images and visual_elements:
print(f" 🎨 Inserting {len(visual_element_images)} visual elements into PDF...")
from docgenie.generation.constants import PIPELINE_04_3_SCALE_UP_FACTOR
scale_up = PIPELINE_04_3_SCALE_UP_FACTOR # 3x upscaling
for ve in visual_elements:
ve_id = ve['id']
if ve_id not in visual_element_images:
print(f" - ⚠️ Skipping {ve_id}: image not generated")
continue
try:
# Decode base64 image
img_data = base64.b64decode(visual_element_images[ve_id])
img = Image.open(BytesIO(img_data))
# Get rect from visual element definition
rect = ve['rect']
bbox_width = rect['width'] # Already in mm
bbox_height = rect['height']
# Convert mm to points (1 mm = 72/25.4 pt)
mm_to_pt = 72 / 25.4
bbox_w_pt = bbox_width * mm_to_pt
bbox_h_pt = bbox_height * mm_to_pt
x0_pt = rect['x'] * mm_to_pt
y0_pt = rect['y'] * mm_to_pt
# Resize with aspect ratio preservation (matches pipeline_13)
iw, ih = img.size
scale = min(bbox_w_pt / iw, bbox_h_pt / ih)
new_w = int(iw * scale * scale_up)
new_h = int(ih * scale * scale_up)
img_resized = img.resize((new_w, new_h), Image.Resampling.LANCZOS).convert("RGBA")
# Create high-res white background
final_img = Image.new(
"RGBA",
(int(bbox_w_pt * scale_up), int(bbox_h_pt * scale_up)),
(255, 255, 255, 0)
)
# Paste resized image centered
offset_x = (int(bbox_w_pt * scale_up) - new_w) // 2
offset_y = (int(bbox_h_pt * scale_up) - new_h) // 2
final_img.paste(img_resized, (offset_x, offset_y), mask=img_resized)
# Convert to bytes for PyMuPDF
img_bytes_io = BytesIO()
final_img.save(img_bytes_io, format="PNG")
img_bytes = img_bytes_io.getvalue()
# Insert image into PDF at specified position
rect_fitz = fitz.Rect(x0_pt, y0_pt, x0_pt + bbox_w_pt, y0_pt + bbox_h_pt)
page.insert_image(rect_fitz, stream=img_bytes)
print(f" - ✓ Inserted {ve_id} ({ve['type']}) at ({x0_pt:.1f}, {y0_pt:.1f})")
except Exception as e:
print(f" - ⚠️ Failed to insert {ve_id}: {e}")
import traceback
traceback.print_exc()
print(f" ✓ Visual element insertion complete")
# Save modified PDF with visual elements
# If handwriting was already added, this is the final PDF (both modifications)
# Otherwise, this is just the visual elements PDF
if pdf_with_handwriting_path:
# Both handwriting and visual elements were added
pdf_final_path = pdf_path.parent / f"{pdf_path.stem}_final.pdf"
doc.save(pdf_final_path)
print(f" - Saved final PDF (with handwriting + visual elements): {pdf_final_path.name}")
else:
# Only visual elements were added
pdf_with_ve_only = pdf_path.parent / f"{pdf_path.stem}_with_visual_elements.pdf"
doc.save(pdf_with_ve_only)
print(f" - Saved PDF with visual elements: {pdf_with_ve_only.name}")
pdf_final_path = pdf_with_ve_only
doc.close()
# Reopen for final image rendering
doc = fitz.open(pdf_final_path)
page = doc[0]
# Step 7: Convert final PDF to image
# Render at high DPI for quality
pix = page.get_pixmap(matrix=fitz.Matrix(3, 3)) # 3x scale = ~220 DPI
img_bytes = pix.tobytes("png")
# Convert to base64
final_image_b64 = base64.b64encode(img_bytes).decode('utf-8')
doc.close()
# Return both PDF paths (for dataset exporter)
return final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path
def extract_rect_from_style(style: str, page_width_mm: float, page_height_mm: float) -> dict:
"""Extract position and dimensions from inline CSS style."""
import re
rect = {'x': 0, 'y': 0, 'width': 0, 'height': 0}
# Parse CSS properties
for prop in style.split(';'):
if ':' not in prop:
continue
key, value = prop.split(':', 1)
key = key.strip().lower()
value = value.strip()
# Extract numeric value and unit
match = re.match(r'([-\d.]+)(mm|cm|px)?', value)
if not match:
continue
num_val = float(match.group(1))
unit = match.group(2) or 'mm'
# Convert to mm
if unit == 'cm':
num_val *= 10
elif unit == 'px':
num_val *= 0.2645833333 # 96 DPI to mm
# Map CSS properties to rect
if key in ('left', 'x'):
rect['x'] = num_val
elif key in ('top', 'y'):
rect['y'] = num_val
elif key == 'width':
rect['width'] = num_val
elif key == 'height':
rect['height'] = num_val
return rect
def extract_rotation_from_style(style: str) -> float:
"""Extract 2D rotation angle from CSS transform property."""
import re
match = re.search(r'rotate\(\s*([-+]?\d*\.?\d+)\s*deg\s*\)', style)
if match:
return float(match.group(1))
return 0.0
# ==================== Stages 14-15: Image Finalization & OCR ====================
def run_local_tesseract_ocr(image: Image.Image) -> dict:
"""
Run Tesseract OCR locally on image.
Args:
image: PIL Image to OCR
Returns:
dict: OCR results in Microsoft OCR format
"""
try:
import pytesseract
# Get OCR data with bounding boxes
data = pytesseract.image_to_data(
image,
lang=settings.OCR_TESSERACT_LANG,
config=settings.OCR_TESSERACT_CONFIG,
output_type=pytesseract.Output.DICT
)
# Convert to Microsoft OCR format
words = []
for i in range(len(data['text'])):
text = data['text'][i].strip()
if text: # Only include non-empty text
words.append({
'text': text,
'confidence': float(data['conf'][i]) / 100.0 if data['conf'][i] != -1 else 0.0,
'geo': [
int(data['left'][i]),
int(data['top'][i]),
int(data['width'][i]),
int(data['height'][i])
]
})
return {
'angle': 0,
'imageWidth': image.width,
'imageHeight': image.height,
'words': words
}
except ImportError:
raise RuntimeError(
"pytesseract not installed. Install with: uv pip install pytesseract\n"
"Also ensure Tesseract OCR is installed on your system:\n"
" Ubuntu/Debian: sudo apt-get install tesseract-ocr\n"
" macOS: brew install tesseract\n"
" Windows: Download from https://github.com/UB-Mannheim/tesseract/wiki"
)
except Exception as e:
print(f"Error running local Tesseract OCR: {e}")
raise
async def call_ocr_service(
image: Image.Image,
ocr_url: str = None,
engine: str = "microsoft_di",
timeout: int = 30,
use_local: bool = None
) -> dict:
"""
Call OCR service on image (Stage 15: Perform OCR).
Supports both local Tesseract OCR and remote OCR services.
Args:
image: PIL Image to OCR
ocr_url: OCR service URL (defaults to settings.OCR_SERVICE_URL)
engine: OCR engine to use
timeout: Request timeout in seconds
use_local: Force local/remote mode (None = use settings.OCR_USE_LOCAL)
Returns:
dict: OCR results in Microsoft OCR format
"""
# Determine if using local or remote OCR
if use_local is None:
use_local = settings.OCR_USE_LOCAL
# Local Tesseract OCR
if use_local:
print(" Using local Tesseract OCR...")
return run_local_tesseract_ocr(image)
# Remote OCR service
if ocr_url is None:
ocr_url = settings.OCR_SERVICE_URL
try:
# Convert image to bytes
buffer = BytesIO()
image.save(buffer, format="PNG")
buffer.seek(0)
image_bytes = buffer.getvalue()
# Call OCR service
endpoint = f"{ocr_url}/v1/sync/ocr/{engine}"
async with httpx.AsyncClient(timeout=timeout) as client:
files = {'image': image_bytes, 'type': 'image/png'}
headers = {'accept': 'application/json'}
response = await client.post(endpoint, headers=headers, files=files)
response.raise_for_status()
data = response.json()
# Extract first page results
if 'ocr' in data and 'pages' in data['ocr'] and len(data['ocr']['pages']) > 0:
return data['ocr']['pages'][0]
else:
raise ValueError("Invalid OCR response format")
except Exception as e:
print(f"Error calling OCR service: {e}")
raise
async def render_pdf_to_image(
pdf_path: pathlib.Path,
dpi: int = 300
) -> tuple[Image.Image, str]:
"""
Convert PDF to high-quality image (Stage 14: Render Image).
Uses pdf2image (poppler) for high-quality conversion matching original pipeline.
Args:
pdf_path: Path to PDF file
dpi: DPI for rendering (default: 300, matching pipeline constant)
Returns:
tuple: (PIL Image, base64-encoded PNG string)
"""
try:
# Use pdf2image (same as original pipeline)
# This uses poppler under the hood for high-quality rendering
images = convert_from_path(pdf_path, dpi=dpi)
if not images:
raise ValueError("PDF conversion resulted in no images")
if len(images) > 1:
print(f"Warning: PDF has {len(images)} pages, using first page only")
img = images[0]
# Convert to base64
buffer = BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
img_base64 = base64.b64encode(buffer.read()).decode('utf-8')
return img, img_base64
except Exception as e:
print(f"Error converting PDF to image: {e}")
raise
def convert_ocr_to_api_format(ocr_page: dict) -> dict:
"""
Convert Microsoft OCR format to API OCRResult schema.
Args:
ocr_page: OCR page result from Microsoft OCR service
Returns:
dict: OCR results in API format
"""
words = []
for word_data in ocr_page.get('words', []):
geo = word_data['geo'] # [x, y, width, height]
words.append({
'text': word_data['text'],
'confidence': word_data['confidence'],
'x': geo[0],
'y': geo[1],
'width': geo[2],
'height': geo[3]
})
lines = []
for line_data in ocr_page.get('lines', []):
geo = line_data['geo']
# Extract words for this line (if available)
line_words = []
# Note: Microsoft OCR doesn't provide word-to-line mapping
# We'll just include the line text
lines.append({
'text': line_data['text'],
'confidence': line_data['confidence'],
'x': geo[0],
'y': geo[1],
'width': geo[2],
'height': geo[3],
'words': line_words
})
return {
'image_width': ocr_page['imageWidth'],
'image_height': ocr_page['imageHeight'],
'angle': ocr_page.get('angle', 0.0),
'words': words,
'lines': lines
}
async def process_stage4_ocr(
pdf_path: pathlib.Path,
enable_ocr: bool = False,
dpi: int = 300
) -> tuple[Optional[str], Optional[dict]]:
"""
Process Stage 4: Image Finalization & OCR.
This corresponds to:
- pipeline_14: Render PDF to high-quality image
- pipeline_15: Perform OCR on final image
Args:
pdf_path: Path to final PDF (after Stage 3 if enabled)
enable_ocr: Whether to run OCR
dpi: DPI for image rendering
Returns:
tuple: (image_base64, ocr_results_dict)
"""
image_base64 = None
ocr_results = None
try:
# Stage 14: Render PDF to image
img, image_base64 = await render_pdf_to_image(pdf_path, dpi=dpi)
print(f" ✓ Stage 14: Rendered image {img.size[0]}x{img.size[1]} @ {dpi} DPI")
# Stage 15: Perform OCR (if enabled and service available)
if enable_ocr and settings.OCR_SERVICE_ENABLED:
try:
ocr_page = await call_ocr_service(
img,
timeout=settings.OCR_SERVICE_TIMEOUT
)
ocr_results = convert_ocr_to_api_format(ocr_page)
print(f" ✓ Stage 15: OCR complete - {len(ocr_results['words'])} words, {len(ocr_results['lines'])} lines")
except Exception as e:
print(f" ⚠ Stage 15: OCR failed - {str(e)}")
# Continue without OCR
elif enable_ocr:
print(f" ⚠ Stage 15: OCR requested but service not enabled (OCR_SERVICE_ENABLED=false)")
return image_base64, ocr_results
except Exception as e:
print(f" ⚠ Stage 4 processing failed: {str(e)}")
return None, None
# ==================== Stages 16-18: Dataset Packaging ====================
async def normalize_bboxes_stage16(
document_id: str,
pdf_path: str,
ocr_results: Optional[Dict[str, Any]],
scale: str = "0-1"
) -> Tuple[Optional[List[Dict]], Optional[List[Dict]]]:
"""
Stage 16: Normalize bounding boxes to [0,1] scale.
Reuses logic from pipeline_16_normalize_bboxes.py
Args:
document_id: Unique document identifier
pdf_path: Path to PDF file
ocr_results: OCR results from Stage 15
scale: Normalization scale ("0-1" or "0-1000")
Returns:
Tuple of (word_level_bboxes, segment_level_bboxes)
"""
try:
print(f"\\n Stage 16: Normalizing bounding boxes...")
if not ocr_results or not ocr_results.get('words'):
print(f" ⚠ Stage 16: No OCR results to normalize")
return None, None
# Get image dimensions from OCR results
img_w_px = ocr_results.get('image_width', 0)
img_h_px = ocr_results.get('image_height', 0)
if img_w_px == 0 or img_h_px == 0:
print(f" ⚠ Stage 16: Invalid image dimensions")
return None, None
# Normalize word-level bboxes
normalized_words = []
for word in ocr_results.get('words', []):
# Convert pixel coordinates to normalized [0,1]
x0_norm = word['x'] / img_w_px
y0_norm = word['y'] / img_h_px
x2_norm = (word['x'] + word['width']) / img_w_px
y2_norm = (word['y'] + word['height']) / img_h_px
# If scale is 0-1000, multiply by 1000
if scale == "0-1000":
x0_norm *= 1000
y0_norm *= 1000
x2_norm *= 1000
y2_norm *= 1000
normalized_words.append({
'text': word['text'],
'x0': x0_norm,
'y0': y0_norm,
'x2': x2_norm,
'y2': y2_norm,
'block_no': None,
'line_no': None,
'word_no': None
})
# Normalize line-level (segment) bboxes
normalized_segments = []
for line in ocr_results.get('lines', []):
x0_norm = line['x'] / img_w_px
y0_norm = line['y'] / img_h_px
x2_norm = (line['x'] + line['width']) / img_w_px
y2_norm = (line['y'] + line['height']) / img_h_px
if scale == "0-1000":
x0_norm *= 1000
y0_norm *= 1000
x2_norm *= 1000
y2_norm *= 1000
normalized_segments.append({
'text': line['text'],
'x0': x0_norm,
'y0': y0_norm,
'x2': x2_norm,
'y2': y2_norm,
'block_no': None,
'line_no': None,
'word_no': None
})
print(f" ✓ Stage 16: Normalized {len(normalized_words)} words, {len(normalized_segments)} segments")
return normalized_words, normalized_segments
except Exception as e:
print(f" ⚠ Stage 16: BBox normalization failed - {str(e)}")
return None, None
async def verify_ground_truth_stage17(
document_id: str,
ground_truth: Optional[Dict],
layout_elements: Optional[List[Dict]],
similarity_cutoff: float = 0.8
) -> Optional[Dict]:
"""
Stage 17: Verify and prepare ground truth annotations.
Simplified version of pipeline_17_gt_preparation_verification.py
Args:
document_id: Unique document identifier
ground_truth: Ground truth data from Stage 2
layout_elements: Layout/visual elements
similarity_cutoff: Similarity threshold for fuzzy matching
Returns:
GT verification result dict
"""
try:
print(f"\\n Stage 17: Verifying ground truth...")
if not ground_truth:
print(f" ⚠ Stage 17: No ground truth to verify")
return {
'passed': False,
'skipped': True,
'confirmed_keys': [],
'similarities': []
}
# Basic validation - check if GT has required structure
confirmed_keys = list(ground_truth.keys()) if isinstance(ground_truth, dict) else []
# For DocVQA-style GT, verify question-answer pairs
valid_pairs = 0
similarities = []
if isinstance(ground_truth, dict):
for question, answer in ground_truth.items():
if question and answer and isinstance(question, str) and isinstance(answer, str):
valid_pairs += 1
# Simplified similarity - just check both exist
similarities.append(1.0)
passed = valid_pairs > 0
result = {
'passed': passed,
'skipped': False,
'confirmed_keys': confirmed_keys,
'similarities': similarities,
'num_layout_elements': len(layout_elements) if layout_elements else 0,
'valid_labels': True
}
print(f" ✓ Stage 17: GT verification {'passed' if passed else 'failed'} - {valid_pairs} valid pairs")
return result
except Exception as e:
print(f" ⚠ Stage 17: GT verification failed - {str(e)}")
return {
'passed': False,
'skipped': False,
'confirmed_keys': [],
'similarities': []
}
async def analyze_document_stage18(
document_id: str,
has_handwriting: bool,
has_visual_elements: bool,
has_ocr: bool,
gt_verification: Optional[Dict],
page_count: int = 1
) -> Dict:
"""
Stage 18: Generate document analysis and statistics.
Simplified version of pipeline_18_analyze.py
Args:
document_id: Unique document identifier
has_handwriting: Whether document has handwriting
has_visual_elements: Whether document has visual elements
has_ocr: Whether OCR was performed
gt_verification: GT verification results
page_count: Number of pages
Returns:
Analysis statistics dict
"""
try:
print(f"\\n Stage 18: Analyzing document...")
# Document validation checks
errors = []
if page_count != 1:
errors.append("is_multipage")
if not gt_verification or not gt_verification.get('passed'):
errors.append("gt_verification_failed")
if not has_ocr:
errors.append("missing_ocr")
is_valid = len(errors) == 0
stats = {
'total_documents': 1,
'valid_documents': 1 if is_valid else 0,
'error_counts': {error: 1 for error in errors},
'has_handwriting': 1 if has_handwriting else 0,
'has_visual_elements': 1 if has_visual_elements else 0,
'has_ocr': 1 if has_ocr else 0,
'multipage_count': 1 if page_count != 1 else 0,
'token_usage': None # Not tracked at single-doc level
}
print(f" ✓ Stage 18: Analysis complete - {'valid' if is_valid else 'has errors'}")
return stats
except Exception as e:
print(f" ⚠ Stage 18: Analysis failed - {str(e)}")
return {
'total_documents': 1,
'valid_documents': 0,
'error_counts': {'analysis_error': 1},
'has_handwriting': 0,
'has_visual_elements': 0,
'has_ocr': 0,
'multipage_count': 0
}
async def create_debug_visualization_stage19(
document_id: str,
image_base64: Optional[str],
normalized_bboxes: Optional[List[Dict]],
show_text: bool = True,
bbox_color: Tuple[int, int, int] = (255, 0, 0)
) -> Optional[Dict]:
"""
Stage 19: Create debug visualization with bbox overlays.
Simplified version of pipeline_19_create_debug_data.py
Args:
document_id: Unique document identifier
image_base64: Base64-encoded image
normalized_bboxes: Normalized bounding boxes
show_text: Whether to show text labels
bbox_color: RGB color for bboxes
Returns:
Debug visualization dict with overlay image
"""
try:
print(f"\\n Stage 19: Creating debug visualization...")
if not image_base64 or not normalized_bboxes:
print(f" ⚠ Stage 19: Missing image or bboxes")
return None
# Decode image
img_data = base64.b64decode(image_base64)
img = Image.open(BytesIO(img_data))
# Import drawing utilities
from PIL import ImageDraw, ImageFont
# Create drawing context
draw = ImageDraw.Draw(img)
img_w, img_h = img.size
# Draw bounding boxes
num_drawn = 0
for bbox in normalized_bboxes[:100]: # Limit to 100 boxes for performance
# Un-normalize coordinates
x0 = bbox['x0'] * img_w
y0 = bbox['y0'] * img_h
x2 = bbox['x2'] * img_w
y2 = bbox['y2'] * img_h
# Draw rectangle
draw.rectangle([x0, y0, x2, y2], outline=bbox_color, width=2)
# Optionally draw text
if show_text and bbox.get('text'):
text = bbox['text'][:20] # Truncate long text
try:
# Try to use a small font
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 10)
except:
font = ImageFont.load_default()
draw.text((x0, y0 - 12), text, fill=bbox_color, font=font)
num_drawn += 1
# Convert back to base64
buffer = BytesIO()
img.save(buffer, format="PNG")
overlay_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
result = {
'bbox_overlay_base64': overlay_base64,
'visual_elements_overlay_base64': None, # Would require additional processing
'handwriting_overlay_base64': None
}
print(f" ✓ Stage 19: Debug visualization created - {num_drawn} boxes drawn")
return result
except Exception as e:
print(f" ⚠ Stage 19: Debug visualization failed - {str(e)}")
import traceback
traceback.print_exc()
return None
async def process_stage5_complete(
document_id: str,
pdf_path: str,
image_base64: Optional[str],
ocr_results: Optional[Dict],
ground_truth: Optional[Dict],
has_handwriting: bool,
has_visual_elements: bool,
layout_elements: Optional[List[Dict]],
enable_bbox_normalization: bool = False,
enable_gt_verification: bool = False,
enable_analysis: bool = False,
enable_debug_visualization: bool = False,
) -> Dict[str, Any]:
"""
Process Stage 5: Dataset Packaging (Stages 16-19).
Args:
document_id: Unique document identifier
pdf_path: Path to PDF file
image_base64: Base64-encoded final image
ocr_results: OCR results from Stage 15
ground_truth: Ground truth from Stage 2
has_handwriting: Whether handwriting was generated
has_visual_elements: Whether visual elements were generated
layout_elements: Layout/visual element metadata
enable_*: Feature flags for each sub-stage
Returns:
Dict with all Stage 5 results
"""
results = {
'normalized_bboxes_word': None,
'normalized_bboxes_segment': None,
'gt_verification': None,
'analysis_stats': None,
'debug_visualization': None
}
try:
print(f"\\n========== Stage 5: Dataset Packaging ==========")
# Stage 16: Normalize bboxes
if enable_bbox_normalization:
norm_words, norm_segments = await normalize_bboxes_stage16(
document_id=document_id,
pdf_path=pdf_path,
ocr_results=ocr_results,
scale=settings.BBOX_NORMALIZATION_SCALE
)
results['normalized_bboxes_word'] = norm_words
results['normalized_bboxes_segment'] = norm_segments
# Stage 17: Verify GT
if enable_gt_verification:
gt_verification = await verify_ground_truth_stage17(
document_id=document_id,
ground_truth=ground_truth,
layout_elements=layout_elements,
similarity_cutoff=settings.GT_VERIFICATION_SIMILARITY_CUTOFF
)
results['gt_verification'] = gt_verification
# Stage 18: Analysis
if enable_analysis:
analysis_stats = await analyze_document_stage18(
document_id=document_id,
has_handwriting=has_handwriting,
has_visual_elements=has_visual_elements,
has_ocr=ocr_results is not None,
gt_verification=results.get('gt_verification'),
page_count=1
)
results['analysis_stats'] = analysis_stats
# Stage 19: Debug visualization
if enable_debug_visualization and image_base64:
# Use normalized bboxes if available
bboxes_for_viz = results.get('normalized_bboxes_word') or results.get('normalized_bboxes_segment')
if bboxes_for_viz:
# Parse color from config
color_str = settings.DEBUG_BBOX_COLOR_RGB
try:
r, g, b = map(int, color_str.split(','))
bbox_color = (r, g, b)
except:
bbox_color = (255, 0, 0) # Red default
debug_viz = await create_debug_visualization_stage19(
document_id=document_id,
image_base64=image_base64,
normalized_bboxes=bboxes_for_viz,
show_text=settings.DEBUG_SHOW_TEXT_IN_BBOX,
bbox_color=bbox_color
)
results['debug_visualization'] = debug_viz
print(f" ✓ Stages 16-18: Dataset packaging complete\\n")
return results
except Exception as e:
print(f" ⚠ Stages 16-18 processing failed: {str(e)}")
import traceback
traceback.print_exc()
return results
# ==================== Dataset Export ====================
async def export_to_msgpack(
document_id: str,
image_path: Optional[str],
image_base64: Optional[str],
words: List[str],
word_bboxes: List[List[float]],
segment_bboxes: Optional[List[List[float]]],
ground_truth: Optional[Dict],
output_path: pathlib.Path,
image_width: Optional[int] = None,
image_height: Optional[int] = None
) -> pathlib.Path:
"""
Export document data to msgpack format.
This creates a simple msgpack file containing the document data in a format
compatible with DocGenie's dataset infrastructure.
Args:
document_id: Unique document identifier
image_path: Path to document image (if available)
image_base64: Base64-encoded image (if no image_path)
words: List of word strings
word_bboxes: Word-level bounding boxes (normalized [0,1])
segment_bboxes: Segment-level bounding boxes (normalized [0,1])
ground_truth: Ground truth annotations
output_path: Output msgpack file path
image_width: Image width in pixels
image_height: Image height in pixels
Returns:
Path to created msgpack file
"""
try:
from datadings.writer import FileWriter
print(f"\\n========== Msgpack Export ==========")
print(f" Exporting document {document_id} to msgpack format...")
# Prepare document data
doc_data = {
"key": document_id,
"sample_id": document_id,
"words": words,
"word_bboxes": word_bboxes, # Should already be normalized [0,1]
}
# Add segment bboxes if available
if segment_bboxes:
doc_data["segment_level_bboxes"] = segment_bboxes
else:
# Fallback: use word bboxes as segment bboxes
doc_data["segment_level_bboxes"] = word_bboxes
# Add image dimensions if available
if image_width and image_height:
doc_data["image_width"] = image_width
doc_data["image_height"] = image_height
# Add image path if available
if image_path:
doc_data["image_file_path"] = str(image_path)
# Process ground truth annotations
if ground_truth:
# Extract classification label if exists
if "label" in ground_truth:
doc_data["label"] = ground_truth["label"]
# Extract entity labels (for NER/token classification)
if "entities" in ground_truth:
entities = ground_truth["entities"]
if entities:
# Create word-level labels (default "O" for outside)
word_labels = ["O"] * len(words)
# Map entities to words
for entity in entities:
entity_text = entity.get("text", "")
entity_label = entity.get("label", "ENTITY")
# Simple matching: find words that match entity text
entity_words = entity_text.split()
for i, word in enumerate(words):
if word in entity_words:
word_labels[i] = f"B-{entity_label}" if i == 0 or word_labels[i-1] == "O" else f"I-{entity_label}"
doc_data["word_labels"] = word_labels
# Extract QA pairs (for extractive QA)
if "questions" in ground_truth:
qa_pairs = []
for qa in ground_truth["questions"]:
qa_pair = {
"question": qa.get("question", ""),
"answers": qa.get("answers", []),
"question_id": qa.get("id", "")
}
qa_pairs.append(qa_pair)
doc_data["qa_pairs"] = qa_pairs
# Extract layout annotations (for document layout analysis)
if "layout_elements" in ground_truth:
layout_elements = ground_truth["layout_elements"]
annotated_objects = []
for elem in layout_elements:
obj = {
"label": elem.get("label", "text"),
"bbox": elem.get("bbox", [0, 0, 1, 1]), # Normalized bbox
"score": elem.get("score", 1.0)
}
annotated_objects.append(obj)
doc_data["annotated_objects"] = annotated_objects
# Ensure output directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write to msgpack file
with FileWriter(output_path, overwrite=True) as writer:
writer.write(doc_data)
print(f" ✓ Msgpack exported: {output_path}")
print(f" - Words: {len(words)}")
print(f" - Word BBoxes: {len(word_bboxes)}")
print(f" - Segment BBoxes: {len(doc_data['segment_level_bboxes'])}")
if "word_labels" in doc_data:
print(f" - Labels: {len(doc_data['word_labels'])}")
if "qa_pairs" in doc_data:
print(f" - QA Pairs: {len(doc_data['qa_pairs'])}")
return output_path
except ImportError:
print(f" ⚠ Warning: 'datadings' package not available. Msgpack export skipped.")
print(f" Install with: pip install datadings")
return None
except Exception as e:
print(f" ⚠ Msgpack export failed: {str(e)}")
import traceback
traceback.print_exc()
return None
def save_individual_tokens_to_disk(
handwriting_images: dict,
visual_element_images: dict,
output_dir: pathlib.Path,
doc_id: str
) -> dict:
"""
Save individual handwriting tokens and visual elements to disk.
Used for 'dataset' and 'complete' output detail levels.
Args:
handwriting_images: Dict {hw_id: base64_png}
visual_element_images: Dict {ve_id: base64_png}
output_dir: Base output directory
doc_id: Document ID for folder naming
Returns:
dict with paths to saved files
"""
import base64
saved_files = {
'handwriting_tokens': [],
'visual_elements': []
}
# Save handwriting tokens
if handwriting_images:
hw_dir = output_dir / doc_id / "handwriting_tokens"
hw_dir.mkdir(parents=True, exist_ok=True)
for hw_id, img_b64 in handwriting_images.items():
img_bytes = base64.b64decode(img_b64)
img_path = hw_dir / f"{hw_id}.png"
img_path.write_bytes(img_bytes)
saved_files['handwriting_tokens'].append(str(img_path.relative_to(output_dir)))
# Save visual elements
if visual_element_images:
ve_dir = output_dir / doc_id / "visual_elements"
ve_dir.mkdir(parents=True, exist_ok=True)
for ve_id, img_b64 in visual_element_images.items():
img_bytes = base64.b64decode(img_b64)
img_path = ve_dir / f"{ve_id}.png"
img_path.write_bytes(img_bytes)
saved_files['visual_elements'].append(str(img_path.relative_to(output_dir)))
return saved_files
def create_token_mapping_json(
handwriting_regions: list[dict],
handwriting_images: dict,
visual_elements: list[dict],
visual_element_images: dict
) -> dict:
"""
Create mapping JSON for ML dataset creation.
Includes style IDs, positions, and image references.
Args:
handwriting_regions: List of handwriting metadata
handwriting_images: Dict of handwriting images
visual_elements: List of visual element metadata
visual_element_images: Dict of visual element images
Returns:
dict with complete token mapping
"""
mapping = {
'handwriting': {
'tokens': [],
'total_count': len(handwriting_regions)
},
'visual_elements': {
'items': [],
'total_count': len(visual_elements)
}
}
# Add handwriting token info
for hw_region in handwriting_regions:
hw_id = hw_region.get('id', 'unknown')
token_info = {
'id': hw_id,
'text': hw_region.get('text', ''),
'author_id': hw_region.get('author_id'),
'is_signature': hw_region.get('is_signature', False),
'rect': hw_region.get('rect', {}),
'has_image': hw_id in handwriting_images,
'image_filename': f"{hw_id}.png" if hw_id in handwriting_images else None
}
mapping['handwriting']['tokens'].append(token_info)
# Add visual element info
for ve in visual_elements:
ve_id = ve.get('id', 'unknown')
ve_info = {
'id': ve_id,
'type': ve.get('type', 'unknown'),
'content': ve.get('content'),
'rect': ve.get('rect', {}),
'has_image': ve_id in visual_element_images,
'image_filename': f"{ve_id}.png" if ve_id in visual_element_images else None
}
mapping['visual_elements']['items'].append(ve_info)
return mapping
def extract_all_bboxes_from_pdf(pdf_path: pathlib.Path) -> Dict[str, List[dict]]:
"""
Extract both word-level and character-level bounding boxes from PDF.
This is a high-priority feature for ML datasets as it provides:
- Word-level bboxes: Ground truth text positions from PDF
- Character-level bboxes: Fine-grained localization for character recognition
Args:
pdf_path: Path to PDF file
Returns:
Dictionary with 'word' and 'char' keys containing bbox lists
"""
from docgenie.generation.pipeline_04.extract_bbox import extract_bboxes_from_pdf
# Extract word-level bboxes
word_bboxes_raw = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="word"
)
# Extract character-level bboxes
char_bboxes_raw = extract_bboxes_from_pdf(
pdf_path=pdf_path,
level="char"
)
# Convert OCRBox objects to dict format
word_bboxes = []
for bbox in word_bboxes_raw:
word_bboxes.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width,
"height": bbox.height,
"bbox": [bbox.x0, bbox.y0, bbox.x2, bbox.y2],
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0
})
char_bboxes = []
for bbox in char_bboxes_raw:
char_bboxes.append({
"text": bbox.text,
"x": bbox.x0,
"y": bbox.y0,
"width": bbox.width,
"height": bbox.height,
"bbox": [bbox.x0, bbox.y0, bbox.x2, bbox.y2],
"block_no": bbox.block_no,
"line_no": bbox.line_no,
"word_no": bbox.word_no,
"page": 0
})
return {
"word": word_bboxes,
"char": char_bboxes
}
def extract_raw_annotations_from_geometries(geometries: List[dict]) -> List[dict]:
"""
Extract raw layout annotations (bounding boxes) from geometries.
This is a high-priority feature for ML datasets as it provides:
- Layout bounding boxes before any normalization
- Shows original coordinate space from HTML rendering
- Useful for debugging annotation processing pipeline
Args:
geometries: List of geometry dictionaries from HTML rendering
Returns:
List of layout annotation dictionaries with bbox coordinates
"""
annotations = []
for geom in geometries:
# Only extract layout elements (class starts with "LE-")
class_name = geom.get('class', '')
if not class_name.startswith('LE-'):
continue
# Extract bbox from rect
rect = geom.get('rect', {})
if not rect:
continue
annotation = {
'class': class_name,
'type': 'layout_element',
'bbox': {
'x': rect.get('x', 0),
'y': rect.get('y', 0),
'width': rect.get('width', 0),
'height': rect.get('height', 0)
},
'text': geom.get('text', ''),
'attributes': geom.get('attributes', {})
}
# Compute x2, y2 for convenience
annotation['bbox']['x2'] = annotation['bbox']['x'] + annotation['bbox']['width']
annotation['bbox']['y2'] = annotation['bbox']['y'] + annotation['bbox']['height']
annotations.append(annotation)
return annotations