Your Name
Initial commit with working MLX-VLM configuration
18352e1
#!/usr/bin/env python3
"""
MonkeyOCR 3B Gradio App for MacBook M4 Pro with MPS Acceleration
Optimized for local deployment with Apple Silicon GPU acceleration
"""
import os
import sys
import tempfile
import shutil
from pathlib import Path
import base64
import re
import uuid
import subprocess
from typing import Optional, Tuple
import gradio as gr
import torch
from PIL import Image
from pdf2image import convert_from_path
from loguru import logger
# Apply PyTorch patch for doclayout_yolo compatibility
from torch_patch import patch_torch_load
patch_torch_load()
# Add MonkeyOCR to path
sys.path.append("./MonkeyOCR")
try:
from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
from magic_pdf.data.dataset import PymuDocDataset, ImageDataset
from magic_pdf.model.doc_analyze_by_custom_model_llm import doc_analyze_llm
from magic_pdf.model.custom_model import MonkeyOCR
except ImportError as e:
logger.error(f"Failed to import MonkeyOCR modules: {e}")
logger.info("Please ensure MonkeyOCR is properly installed")
sys.exit(1)
# Global model instance
model_instance = None
def initialize_model(config_path: str = "model_configs_mps.yaml") -> MonkeyOCR:
"""Initialize MonkeyOCR model with MPS optimization"""
global model_instance
if model_instance is None:
logger.info("Initializing MonkeyOCR model with MPS acceleration...")
# Check if MPS is available
if not torch.backends.mps.is_available():
logger.warning("MPS not available, falling back to CPU")
# Modify config to use CPU
import yaml
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
config['device'] = 'cpu'
with open(config_path, 'w') as f:
yaml.dump(config, f)
else:
logger.info("MPS is available and will be used for acceleration")
# Set environment variables for optimal MPS performance
os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0'
try:
model_instance = MonkeyOCR(config_path)
logger.info("Model initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize model: {e}")
raise
return model_instance
def render_latex_table_to_image(latex_content: str, temp_dir: str) -> str:
"""Render LaTeX table to image and return HTML img tag"""
try:
# Extract tabular environment content
pattern = r"(\\begin\{tabular\}.*?\\end\{tabular\})"
matches = re.findall(pattern, latex_content, re.DOTALL)
if matches:
table_content = matches[0]
elif '\\begin{tabular}' in latex_content:
if '\\end{tabular}' not in latex_content:
table_content = latex_content + '\n\\end{tabular}'
else:
table_content = latex_content
else:
return latex_content
# Build complete LaTeX document
full_latex = r"""
\documentclass{article}
\usepackage[utf8]{inputenc}
\usepackage{booktabs}
\usepackage{bm}
\usepackage{multirow}
\usepackage{array}
\usepackage{colortbl}
\usepackage[table]{xcolor}
\usepackage{amsmath}
\usepackage{amssymb}
\usepackage{graphicx}
\usepackage{geometry}
\usepackage{makecell}
\usepackage[active,tightpage]{preview}
\PreviewEnvironment{tabular}
\begin{document}
""" + table_content + r"""
\end{document}
"""
# Generate unique filename
unique_id = str(uuid.uuid4())[:8]
tex_path = os.path.join(temp_dir, f"table_{unique_id}.tex")
pdf_path = os.path.join(temp_dir, f"table_{unique_id}.pdf")
png_path = os.path.join(temp_dir, f"table_{unique_id}.png")
# Write tex file
with open(tex_path, "w", encoding="utf-8") as f:
f.write(full_latex)
# Compile LaTeX to PDF
result = subprocess.run(
["pdflatex", "-interaction=nonstopmode", "-output-directory", temp_dir, tex_path],
timeout=20,
capture_output=True,
text=True
)
if result.returncode != 0 or not os.path.exists(pdf_path):
logger.warning("LaTeX compilation failed, returning original content")
return f"<pre>{latex_content}</pre>"
# Convert PDF to PNG
images = convert_from_path(pdf_path, dpi=300)
images[0].save(png_path, "PNG")
# Convert to base64
with open(png_path, "rb") as f:
img_data = f.read()
img_base64 = base64.b64encode(img_data).decode("utf-8")
# Clean up temporary files
for file_path in [tex_path, pdf_path, png_path]:
if os.path.exists(file_path):
os.remove(file_path)
return f'<img src="data:image/png;base64,{img_base64}" style="max-width:100%;height:auto;">'
except Exception as e:
logger.warning(f"LaTeX rendering error: {e}")
return f"<pre>{latex_content}</pre>"
def process_document(file_path: str) -> Tuple[str, str]:
"""Process document and return markdown content and layout PDF path"""
if not file_path:
return "", ""
try:
model = initialize_model()
parent_path = os.path.dirname(file_path)
full_name = os.path.basename(file_path)
name = '.'.join(full_name.split(".")[:-1])
# Create output directories
local_image_dir = os.path.join(parent_path, "markdown", "images")
local_md_dir = os.path.join(parent_path, "markdown")
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
image_dir = os.path.basename(local_image_dir)
image_writer = FileBasedDataWriter(local_image_dir)
md_writer = FileBasedDataWriter(local_md_dir)
reader = FileBasedDataReader(parent_path)
# Read file data
data_bytes = reader.read(full_name)
# Create dataset based on file type
if full_name.split(".")[-1].lower() in ['jpg', 'jpeg', 'png']:
ds = ImageDataset(data_bytes)
else:
ds = PymuDocDataset(data_bytes)
# Process document with threading-based timeout
logger.info("Processing document with MonkeyOCR...")
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
def process_with_model():
overall_start_time = time.time()
# Step 1: Document Analysis
analysis_start_time = time.time()
logger.info("Starting document analysis...")
infer_result = ds.apply(doc_analyze_llm, MonkeyOCR_model=model)
logger.info(f"PROFILE: Document analysis (doc_analyze_llm) took {time.time() - analysis_start_time:.2f}s")
# Step 2: OCR and Layout Processing
ocr_start_time = time.time()
logger.info("Starting OCR and layout processing...")
pipe_result = infer_result.pipe_ocr_mode(image_writer, MonkeyOCR_model=model)
logger.info(f"PROFILE: OCR/Layout (pipe_ocr_mode) took {time.time() - ocr_start_time:.2f}s")
logger.info(f"PROFILE: Total model processing took {time.time() - overall_start_time:.2f}s")
return infer_result, pipe_result
# Use ThreadPoolExecutor with timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(process_with_model)
try:
infer_result, pipe_result = future.result(timeout=300) # 5 minute timeout
except FutureTimeoutError:
logger.error("Processing timed out after 5 minutes")
raise TimeoutError("Document processing timed out. Please try with a smaller document or simpler layout.")
# Generate layout PDF
layout_pdf_path = os.path.join(parent_path, f"{name}_layout.pdf")
pipe_result.draw_layout(layout_pdf_path)
# Generate markdown
pipe_result.dump_md(md_writer, f"{name}.md", image_dir)
md_content_ori = FileBasedDataReader(local_md_dir).read(f"{name}.md").decode("utf-8")
# Process markdown content (render LaTeX tables and convert images to base64)
temp_dir = tempfile.mkdtemp()
try:
# Process HTML-wrapped LaTeX tables
def replace_html_latex_table(match):
html_content = match.group(1)
if '\\begin{tabular}' in html_content:
return render_latex_table_to_image(html_content, temp_dir)
else:
return match.group(0)
md_content = re.sub(r'<html>(.*?)</html>', replace_html_latex_table, md_content_ori, flags=re.DOTALL)
# Convert local image links to base64
def replace_image_with_base64(match):
img_path = match.group(1)
if not os.path.isabs(img_path):
full_img_path = os.path.join(local_md_dir, img_path)
else:
full_img_path = img_path
try:
if os.path.exists(full_img_path):
with open(full_img_path, "rb") as f:
img_data = f.read()
img_base64 = base64.b64encode(img_data).decode("utf-8")
ext = os.path.splitext(full_img_path)[1].lower()
mime_type = "image/jpeg" if ext in ['.jpg', '.jpeg'] else f"image/{ext[1:]}"
return f'<img src="data:{mime_type};base64,{img_base64}" style="max-width:100%;height:auto;">'
else:
return match.group(0)
except Exception:
return match.group(0)
md_content = re.sub(r'!\[.*?\]\(([^)]+)\)', replace_image_with_base64, md_content)
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)
logger.info("Document processing completed successfully")
return md_content, layout_pdf_path
except Exception as e:
logger.error(f"Error processing document: {e}")
return f"Error processing document: {str(e)}", ""
def parse_document(file) -> Tuple[str, Optional[str]]:
"""Parse uploaded document and return results"""
if file is None:
return "Please upload a document first.", None
try:
# Process the document
markdown_content, layout_pdf_path = process_document(file.name)
if not markdown_content:
return "Failed to process document.", None
return markdown_content, layout_pdf_path if os.path.exists(layout_pdf_path) else None
except Exception as e:
logger.error(f"Error in parse_document: {e}")
return f"Error: {str(e)}", None
def create_gradio_interface():
"""Create and configure Gradio interface"""
# Custom CSS for better appearance
css = """
.gradio-container {
max-width: 1200px !important;
}
.markdown-content {
max-height: 600px;
overflow-y: auto;
border: 1px solid #ddd;
padding: 10px;
border-radius: 5px;
}
"""
with gr.Blocks(
title="MonkeyOCR 3B - Local MPS Demo",
css=css,
theme=gr.themes.Soft()
) as demo:
gr.Markdown("""
# 🐡 MonkeyOCR 3B - Local Demo (Apple Silicon MPS)
**Optimized for MacBook M4 Pro with 48GB RAM**
Upload a PDF or image document to extract structured content with state-of-the-art accuracy.
The model runs locally using Apple's Metal Performance Shaders for GPU acceleration.
**Supported formats:** PDF, PNG, JPG, JPEG
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="πŸ“„ Upload Document",
file_types=[".pdf", ".png", ".jpg", ".jpeg"],
type="filepath"
)
parse_btn = gr.Button(
"πŸš€ Parse Document",
variant="primary",
size="lg"
)
gr.Markdown("""
**Tips:**
- Larger documents may take a few minutes to process
- The model excels at formulas, tables, and complex layouts
- Processing speed: ~0.84 pages/second on M4 Pro
""")
with gr.Column(scale=2):
markdown_output = gr.Markdown(
label="πŸ“ Extracted Content",
elem_classes=["markdown-content"]
)
layout_pdf_output = gr.File(
label="πŸ“Š Layout Analysis (PDF)",
visible=False
)
# Event handlers
parse_btn.click(
fn=parse_document,
inputs=[file_input],
outputs=[markdown_output, layout_pdf_output],
show_progress=True
)
# Show layout PDF when available
def show_layout_pdf(pdf_path):
if pdf_path:
return gr.update(visible=True, value=pdf_path)
return gr.update(visible=False)
layout_pdf_output.change(
fn=show_layout_pdf,
inputs=[layout_pdf_output],
outputs=[layout_pdf_output]
)
return demo
def main():
"""Main function to run the Gradio app"""
logger.info("Starting MonkeyOCR 3B Gradio App...")
# Check system requirements
if not torch.backends.mps.is_available():
logger.warning("MPS not available. The app will run on CPU which may be slower.")
else:
logger.info("MPS is available. GPU acceleration enabled.")
# Create and launch the interface
demo = create_gradio_interface()
# Launch with appropriate settings
demo.launch(
server_name="0.0.0.0", # Allow external access
server_port=7861, # Use different port to avoid conflicts
share=False, # Set to True if you want a public link
show_error=True,
quiet=False
)
if __name__ == "__main__":
main()