Spaces:
Paused
Paused
| """ | |
| MinerU Document Parser API | |
| A FastAPI service that wraps MinerU for parsing PDFs and images | |
| into LLM-ready markdown/JSON formats. | |
| Features: | |
| - Automatic chunking for large PDFs (10 pages per chunk) | |
| - Parallel processing of chunks for faster throughput | |
| - Automatic fallback to pipeline backend on GPU memory errors | |
| """ | |
| import asyncio | |
| import base64 | |
| import io | |
| import ipaddress | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import secrets | |
| import shutil | |
| import socket | |
| import subprocess | |
| import tempfile | |
| import time | |
| import zipfile | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from typing import BinaryIO, Optional, Union | |
| from urllib.parse import urlparse | |
| from uuid import uuid4 | |
| import httpx | |
| from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from pydantic import BaseModel | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)-8s | %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| logger = logging.getLogger("md-parser") | |
| # Security | |
| API_TOKEN = os.getenv("API_TOKEN") | |
| API_DEV_TOKEN = os.getenv("API_DEV_TOKEN") | |
| security = HTTPBearer() | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: | |
| """Verify the API token from Authorization header.""" | |
| if not API_TOKEN and not API_DEV_TOKEN: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="No API tokens configured on server", | |
| ) | |
| token = credentials.credentials | |
| # Check against both tokens | |
| token_valid = False | |
| if API_TOKEN and secrets.compare_digest(token, API_TOKEN): | |
| token_valid = True | |
| if API_DEV_TOKEN and secrets.compare_digest(token, API_DEV_TOKEN): | |
| token_valid = True | |
| if not token_valid: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid API token", | |
| ) | |
| return token | |
| from contextlib import asynccontextmanager | |
| def _check_model_cache() -> dict: | |
| """Check model cache status and return cache info.""" | |
| cache_info = {} | |
| cache_dirs = [ | |
| ("HuggingFace", os.environ.get("HF_HOME", "/home/user/.cache/huggingface")), | |
| ("Torch", os.environ.get("TORCH_HOME", "/home/user/.cache/torch")), | |
| ("ModelScope", os.environ.get("MODELSCOPE_CACHE", "/home/user/.cache/modelscope")), | |
| ] | |
| for name, path in cache_dirs: | |
| if os.path.exists(path): | |
| try: | |
| # Get directory size | |
| total_size = 0 | |
| file_count = 0 | |
| for dirpath, dirnames, filenames in os.walk(path): | |
| for f in filenames: | |
| fp = os.path.join(dirpath, f) | |
| total_size += os.path.getsize(fp) | |
| file_count += 1 | |
| size_mb = total_size / (1024 * 1024) | |
| cache_info[name] = {"size_mb": round(size_mb, 2), "files": file_count, "status": "cached"} | |
| except Exception as e: | |
| cache_info[name] = {"status": f"error: {e}"} | |
| else: | |
| cache_info[name] = {"status": "not found"} | |
| return cache_info | |
| async def lifespan(app: FastAPI): | |
| """Startup: verify MinerU is available and check model cache.""" | |
| logger.info("=" * 60) | |
| logger.info("Starting MD Parser API v1.4.0...") | |
| logger.info(f"Backend: {MINERU_BACKEND}") | |
| logger.info(f"Default language: {MINERU_LANG}") | |
| logger.info(f"Max file size: {MAX_FILE_SIZE_MB}MB") | |
| logger.info(f"Chunking: {CHUNK_SIZE} pages/chunk, threshold {CHUNKING_THRESHOLD} pages, {MAX_WORKERS} workers") | |
| try: | |
| # Verify mineru CLI is available | |
| result = subprocess.run(["mineru", "--version"], capture_output=True, text=True) | |
| logger.info(f"MinerU version: {result.stdout.strip()}") | |
| except Exception as e: | |
| logger.warning(f"MinerU check failed: {e}") | |
| # Check model cache status | |
| logger.info("-" * 40) | |
| logger.info("Model cache status:") | |
| cache_info = _check_model_cache() | |
| for name, info in cache_info.items(): | |
| if info.get("status") == "cached": | |
| logger.info(f" {name}: {info['size_mb']:.2f} MB ({info['files']} files) - CACHED") | |
| else: | |
| logger.warning(f" {name}: {info.get('status', 'unknown')}") | |
| total_cached = sum(info.get("size_mb", 0) for info in cache_info.values() if info.get("status") == "cached") | |
| if total_cached > 0: | |
| logger.info(f" Total cached: {total_cached:.2f} MB") | |
| logger.info(" Models are pre-loaded - no download needed at runtime") | |
| else: | |
| logger.warning(" No cached models found - first request may be slow") | |
| logger.info("=" * 60) | |
| logger.info("MD Parser API ready to accept requests") | |
| logger.info("=" * 60) | |
| yield | |
| logger.info("Shutting down MD Parser API...") | |
| app = FastAPI( | |
| title="MD Parser API", | |
| description="Transform PDFs and images into markdown/JSON using MinerU", | |
| version="1.4.0", | |
| lifespan=lifespan, | |
| ) | |
| # Configuration from environment (optimized for A100 GPU) | |
| MINERU_BACKEND = os.getenv("MINERU_BACKEND", "pipeline") | |
| MINERU_LANG = os.getenv("MINERU_LANG", "en") | |
| MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "1024")) | |
| MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
| # Chunking configuration | |
| CHUNK_SIZE = int(os.getenv("CHUNK_SIZE", "10")) # Pages per chunk | |
| # MAX_WORKERS: Number of parallel workers for chunk processing | |
| # - Default 3 for faster processing on A100 (80GB VRAM) | |
| # - If OOM occurs, automatically falls back to sequential (1 worker) | |
| MAX_WORKERS = int(os.getenv("MAX_WORKERS", "3")) | |
| CHUNKING_THRESHOLD = int(os.getenv("CHUNKING_THRESHOLD", "20")) # Min pages to enable chunking | |
| # Enable torch.compile for ~15% speedup if available | |
| if os.getenv("TORCH_COMPILE_ENABLED", "0") == "1": | |
| try: | |
| import torch | |
| torch.set_float32_matmul_precision('high') | |
| except Exception: | |
| pass | |
| # Blocked hostnames for SSRF protection | |
| BLOCKED_HOSTNAMES = { | |
| "localhost", | |
| "metadata", | |
| "metadata.google.internal", | |
| "metadata.google", | |
| "169.254.169.254", # AWS/GCP/Azure metadata service | |
| "fd00:ec2::254", # AWS IPv6 metadata | |
| } | |
| def _validate_url(url: str) -> None: | |
| """ | |
| Validate URL to prevent SSRF attacks. | |
| Raises HTTPException if URL is invalid or points to internal/private resources. | |
| """ | |
| try: | |
| parsed = urlparse(url) | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid URL format: {str(e)}", | |
| ) | |
| # Check scheme | |
| if parsed.scheme not in ("http", "https"): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed.", | |
| ) | |
| # Check hostname exists | |
| hostname = parsed.hostname | |
| if not hostname: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid URL: missing hostname.", | |
| ) | |
| # Check against blocked hostnames | |
| hostname_lower = hostname.lower() | |
| if hostname_lower in BLOCKED_HOSTNAMES: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to internal/metadata services is not allowed.", | |
| ) | |
| # Block hostnames containing suspicious patterns | |
| blocked_patterns = ["metadata", "internal", "localhost", "127.0.0.1", "::1"] | |
| for pattern in blocked_patterns: | |
| if pattern in hostname_lower: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to internal/metadata services is not allowed.", | |
| ) | |
| # Resolve hostname and check IP address | |
| try: | |
| ip_str = socket.gethostbyname(hostname) | |
| ip = ipaddress.ip_address(ip_str) | |
| except socket.gaierror: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Could not resolve hostname: {hostname}", | |
| ) | |
| except ValueError as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid IP address resolved: {str(e)}", | |
| ) | |
| # Block private, loopback, link-local, and reserved IP ranges | |
| if ip.is_private: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to private IP addresses is not allowed.", | |
| ) | |
| if ip.is_loopback: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to loopback addresses is not allowed.", | |
| ) | |
| if ip.is_link_local: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to link-local addresses is not allowed.", | |
| ) | |
| if ip.is_reserved: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to reserved IP addresses is not allowed.", | |
| ) | |
| if ip.is_multicast: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Access to multicast addresses is not allowed.", | |
| ) | |
| def _save_uploaded_file(input_path: Path, file_obj: BinaryIO) -> None: | |
| """Sync helper to save uploaded file to disk (runs in thread).""" | |
| with open(input_path, "wb") as f: | |
| shutil.copyfileobj(file_obj, f) | |
| def _save_downloaded_content(input_path: Path, content: bytes) -> None: | |
| """Sync helper to save downloaded content to disk (runs in thread).""" | |
| with open(input_path, "wb") as f: | |
| f.write(content) | |
| def _extract_images_as_zip(output_dir: Path, prefix: str = "") -> tuple[bytes, int]: | |
| """ | |
| Extract all images from output directory and return as a zip file bytes. | |
| Args: | |
| output_dir: Directory containing images (MinerU puts them in images/ subfolder) | |
| prefix: Optional prefix for image paths in the zip (e.g., "chunk_0/") | |
| Returns: | |
| Tuple of (zip_bytes, image_count) | |
| """ | |
| image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp"} | |
| zip_buffer = io.BytesIO() | |
| image_count = 0 | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| for img_path in output_dir.glob("**/*"): | |
| if img_path.is_file() and img_path.suffix.lower() in image_extensions: | |
| try: | |
| # Use relative path from output_dir as path in zip | |
| relative_path = img_path.relative_to(output_dir) | |
| zip_path = f"{prefix}{relative_path}" if prefix else str(relative_path) | |
| zf.write(img_path, zip_path) | |
| image_count += 1 | |
| except Exception as e: | |
| logger.warning(f"Failed to add image {img_path} to zip: {e}") | |
| return zip_buffer.getvalue(), image_count | |
| def _create_images_zip_base64(output_dir: Path, prefix: str = "") -> tuple[Optional[str], int]: | |
| """ | |
| Extract images and return as base64-encoded zip. | |
| Returns: | |
| Tuple of (base64_zip_string or None if no images, image_count) | |
| """ | |
| zip_bytes, image_count = _extract_images_as_zip(output_dir, prefix) | |
| if image_count == 0: | |
| return None, 0 | |
| return base64.b64encode(zip_bytes).decode("utf-8"), image_count | |
| class ParseResponse(BaseModel): | |
| """Response model for document parsing.""" | |
| success: bool | |
| markdown: Optional[str] = None | |
| json_content: Optional[Union[dict, list]] = None # Can be dict (single) or list (chunked) | |
| images_zip: Optional[str] = None # Base64-encoded zip file containing all images | |
| image_count: int = 0 # Number of images in the zip | |
| error: Optional[str] = None | |
| pages_processed: int = 0 | |
| backend_used: Optional[str] = None # Actual backend used (may differ if fallback occurred) | |
| # vLLM GPU memory error patterns that trigger fallback to pipeline | |
| VLLM_MEMORY_ERROR_PATTERNS = [ | |
| "Free memory on device cuda", | |
| "Decrease GPU memory utilization", | |
| "CUDA out of memory", | |
| "OutOfMemoryError", | |
| ] | |
| def _has_gpu_memory_error(output: str) -> bool: | |
| """Check if output contains GPU memory error patterns.""" | |
| for pattern in VLLM_MEMORY_ERROR_PATTERNS: | |
| if pattern in output: | |
| return True | |
| return False | |
| def _run_mineru( | |
| input_path: Path, | |
| output_dir: Path, | |
| backend: str, | |
| lang: str, | |
| start_page: int, | |
| end_page: Optional[int], | |
| request_id: str, | |
| ) -> tuple[subprocess.CompletedProcess, str]: | |
| """ | |
| Run MinerU with the specified backend. | |
| Returns tuple of (process result, backend actually used). | |
| If GPU memory error occurs with hybrid backend, automatically retries with pipeline. | |
| Uses global lock to prevent parallel execution which causes silent failures. | |
| """ | |
| def build_cmd(use_backend: str) -> list[str]: | |
| cmd = [ | |
| "mineru", | |
| "-p", str(input_path), | |
| "-o", str(output_dir), | |
| "-b", use_backend, | |
| "-l", lang, | |
| ] | |
| if start_page > 0: | |
| cmd.extend(["-s", str(start_page)]) | |
| if end_page is not None: | |
| cmd.extend(["-e", str(end_page)]) | |
| return cmd | |
| # First attempt with requested backend | |
| cmd = build_cmd(backend) | |
| logger.info(f"[{request_id}] Starting MinerU processing...") | |
| logger.info(f"[{request_id}] Command: {' '.join(cmd)}") | |
| logger.info(f"[{request_id}] Backend: {backend}") | |
| parse_start = time.time() | |
| proc = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
| parse_duration = time.time() - parse_start | |
| logger.info(f"[{request_id}] MinerU completed in {parse_duration:.2f}s") | |
| logger.info(f"[{request_id}] Return code: {proc.returncode}") | |
| if proc.stdout: | |
| for line in proc.stdout.strip().split('\n')[-10:]: | |
| logger.info(f"[{request_id}] [stdout] {line}") | |
| if proc.stderr: | |
| for line in proc.stderr.strip().split('\n')[-10:]: | |
| logger.warning(f"[{request_id}] [stderr] {line}") | |
| combined_output = (proc.stdout or "") + (proc.stderr or "") | |
| # Check for GPU memory errors and fallback to pipeline if needed | |
| if backend != "pipeline" and _has_gpu_memory_error(combined_output): | |
| logger.warning(f"[{request_id}] GPU memory error detected with {backend}, falling back to pipeline...") | |
| # Clear output directory for retry | |
| for f in output_dir.glob("*"): | |
| if f.is_file(): | |
| f.unlink() | |
| elif f.is_dir(): | |
| shutil.rmtree(f) | |
| # Retry with pipeline backend | |
| fallback_cmd = build_cmd("pipeline") | |
| logger.info(f"[{request_id}] Retrying with pipeline backend...") | |
| logger.info(f"[{request_id}] Command: {' '.join(fallback_cmd)}") | |
| parse_start = time.time() | |
| proc = subprocess.run(fallback_cmd, capture_output=True, text=True, timeout=600) | |
| parse_duration = time.time() - parse_start | |
| logger.info(f"[{request_id}] MinerU (pipeline fallback) completed in {parse_duration:.2f}s") | |
| logger.info(f"[{request_id}] Return code: {proc.returncode}") | |
| if proc.stdout: | |
| for line in proc.stdout.strip().split('\n')[-10:]: | |
| logger.info(f"[{request_id}] [stdout] {line}") | |
| return proc, "pipeline" | |
| return proc, backend | |
| def _get_pdf_page_count(input_path: Path) -> int: | |
| """Get the total number of pages in a PDF using pdfinfo.""" | |
| try: | |
| result = subprocess.run( | |
| ["pdfinfo", str(input_path)], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 | |
| ) | |
| if result.returncode == 0: | |
| for line in result.stdout.split('\n'): | |
| if line.startswith('Pages:'): | |
| return int(line.split(':')[1].strip()) | |
| except Exception as e: | |
| logger.warning(f"Failed to get PDF page count: {e}") | |
| return 0 | |
| def _process_single_chunk( | |
| chunk_id: int, | |
| input_path: Path, | |
| chunk_output_dir: Path, | |
| backend: str, | |
| lang: str, | |
| start_page: int, | |
| end_page: int, | |
| request_id: str, | |
| include_images: bool = False, | |
| ) -> dict: | |
| """Process a single chunk of pages. Returns dict with chunk results.""" | |
| chunk_request_id = f"{request_id}-c{chunk_id}" | |
| logger.info(f"[{chunk_request_id}] Processing chunk {chunk_id}: pages {start_page}-{end_page}") | |
| try: | |
| chunk_output_dir.mkdir(parents=True, exist_ok=True) | |
| proc, backend_used = _run_mineru( | |
| input_path=input_path, | |
| output_dir=chunk_output_dir, | |
| backend=backend, | |
| lang=lang, | |
| start_page=start_page, | |
| end_page=end_page, | |
| request_id=chunk_request_id, | |
| ) | |
| if proc.returncode != 0: | |
| logger.error(f"[{chunk_request_id}] Chunk {chunk_id} failed with code {proc.returncode}") | |
| return { | |
| "chunk_id": chunk_id, | |
| "success": False, | |
| "error": f"MinerU failed (code {proc.returncode}): {proc.stderr[:500] if proc.stderr else 'No stderr'}", | |
| "backend_used": backend_used, | |
| "pages": end_page - start_page + 1, | |
| } | |
| # Read chunk output - list all files for debugging | |
| all_files = list(chunk_output_dir.glob("**/*")) | |
| logger.info(f"[{chunk_request_id}] Output files: {[str(f) for f in all_files[:20]]}") | |
| md_files = list(chunk_output_dir.glob("**/*.md")) | |
| markdown_content = "" | |
| if md_files: | |
| markdown_content = md_files[0].read_text(encoding="utf-8") | |
| logger.info(f"[{chunk_request_id}] Found markdown: {md_files[0]}") | |
| json_content = None | |
| json_files = [f for f in chunk_output_dir.glob("**/*.json") if "_content_list" not in f.name] | |
| if json_files: | |
| try: | |
| json_content = json.loads(json_files[0].read_text(encoding="utf-8")) | |
| except json.JSONDecodeError: | |
| pass | |
| # Extract images from chunk output (only if requested) | |
| chunk_images_zip = None | |
| chunk_image_count = 0 | |
| if include_images: | |
| zip_bytes, chunk_image_count = _extract_images_as_zip(chunk_output_dir) | |
| # Only keep zip bytes if we actually have images | |
| if chunk_image_count > 0: | |
| chunk_images_zip = zip_bytes | |
| logger.info(f"[{chunk_request_id}] Chunk {chunk_id} completed: {len(markdown_content)} chars markdown, json={'yes' if json_content else 'no'}, images={chunk_image_count}") | |
| # Check if we got any content - empty output might indicate a problem | |
| has_content = bool(markdown_content.strip()) or bool(json_content) | |
| if not has_content: | |
| logger.warning(f"[{chunk_request_id}] Chunk {chunk_id} produced no content (pages {start_page}-{end_page})") | |
| return { | |
| "chunk_id": chunk_id, | |
| "success": True, # MinerU succeeded, even if content is empty (e.g., blank pages) | |
| "markdown": markdown_content, | |
| "json_content": json_content, | |
| "images_zip_bytes": chunk_images_zip, | |
| "image_count": chunk_image_count, | |
| "backend_used": backend_used, | |
| "pages": end_page - start_page + 1, | |
| "start_page": start_page, | |
| "end_page": end_page, | |
| "has_content": has_content, | |
| } | |
| except Exception as e: | |
| logger.error(f"[{chunk_request_id}] Chunk {chunk_id} exception: {e}") | |
| return { | |
| "chunk_id": chunk_id, | |
| "success": False, | |
| "error": str(e), | |
| "backend_used": backend, | |
| "pages": 0, | |
| } | |
| def _has_oom_error_in_results(chunk_results: list) -> bool: | |
| """Check if any chunk failed due to OOM error.""" | |
| for r in chunk_results: | |
| if not r["success"]: | |
| error_msg = r.get("error", "") | |
| if any(pattern in error_msg for pattern in VLLM_MEMORY_ERROR_PATTERNS): | |
| return True | |
| return False | |
| def _process_chunks_with_workers( | |
| chunks: list, | |
| input_path: Path, | |
| base_output_dir: Path, | |
| chunk_backend: str, | |
| lang: str, | |
| request_id: str, | |
| num_workers: int, | |
| include_images: bool = False, | |
| ) -> list: | |
| """Process chunks with specified number of workers.""" | |
| chunk_results = [] | |
| with ThreadPoolExecutor(max_workers=num_workers) as executor: | |
| futures = {} | |
| for cid, cstart, cend in chunks: | |
| chunk_output_dir = base_output_dir / f"chunk_{cid}" | |
| # Clean up any previous attempt | |
| if chunk_output_dir.exists(): | |
| shutil.rmtree(chunk_output_dir) | |
| future = executor.submit( | |
| _process_single_chunk, | |
| cid, | |
| input_path, | |
| chunk_output_dir, | |
| chunk_backend, | |
| lang, | |
| cstart, | |
| cend, | |
| request_id, | |
| include_images, | |
| ) | |
| futures[future] = cid | |
| for future in as_completed(futures): | |
| result = future.result() | |
| chunk_results.append(result) | |
| return chunk_results | |
| def _process_chunked( | |
| input_path: Path, | |
| base_output_dir: Path, | |
| backend: str, | |
| lang: str, | |
| start_page: int, | |
| end_page: Optional[int], | |
| total_pages: int, | |
| request_id: str, | |
| output_format: str, | |
| include_images: bool = False, | |
| ) -> ParseResponse: | |
| """Process a PDF in parallel chunks and combine results. | |
| Automatically falls back to sequential processing if OOM errors are detected. | |
| """ | |
| # Calculate actual end page | |
| actual_end = end_page if end_page is not None else total_pages - 1 | |
| # Generate chunk ranges | |
| chunks = [] | |
| current_start = start_page | |
| chunk_id = 0 | |
| while current_start <= actual_end: | |
| chunk_end = min(current_start + CHUNK_SIZE - 1, actual_end) | |
| chunks.append((chunk_id, current_start, chunk_end)) | |
| current_start = chunk_end + 1 | |
| chunk_id += 1 | |
| # Use requested backend for chunked processing | |
| # OOM protection will automatically fall back to sequential if needed | |
| chunk_backend = backend | |
| logger.info(f"[{request_id}] Splitting into {len(chunks)} chunks of up to {CHUNK_SIZE} pages each") | |
| logger.info(f"[{request_id}] Backend: {chunk_backend}, workers: {MAX_WORKERS}") | |
| # Process chunks - start with configured workers, fall back to sequential on OOM | |
| current_workers = MAX_WORKERS | |
| chunk_results = _process_chunks_with_workers( | |
| chunks, input_path, base_output_dir, chunk_backend, lang, request_id, current_workers, include_images | |
| ) | |
| # Check for OOM errors and retry with fewer workers if needed | |
| if _has_oom_error_in_results(chunk_results) and current_workers > 1: | |
| logger.warning(f"[{request_id}] OOM detected with {current_workers} workers, retrying sequentially (1 worker)") | |
| # Clean up and retry with sequential processing | |
| for cid, _, _ in chunks: | |
| chunk_dir = base_output_dir / f"chunk_{cid}" | |
| if chunk_dir.exists(): | |
| shutil.rmtree(chunk_dir) | |
| chunk_results = _process_chunks_with_workers( | |
| chunks, input_path, base_output_dir, chunk_backend, lang, request_id, 1, include_images | |
| ) | |
| # Sort by chunk_id to maintain page order | |
| chunk_results.sort(key=lambda x: x["chunk_id"]) | |
| # Check for failures and empty chunks | |
| failed_chunks = [r for r in chunk_results if not r["success"]] | |
| if failed_chunks: | |
| errors = "; ".join([f"Chunk {r['chunk_id']}: {r.get('error', 'Unknown')}" for r in failed_chunks]) | |
| logger.error(f"[{request_id}] {len(failed_chunks)} chunks failed: {errors}") | |
| empty_chunks = [r for r in chunk_results if r["success"] and not r.get("has_content", True)] | |
| if empty_chunks: | |
| empty_ranges = [f"pages {r['start_page']}-{r['end_page']}" for r in empty_chunks] | |
| logger.warning(f"[{request_id}] {len(empty_chunks)} chunks had no content: {', '.join(empty_ranges)}") | |
| # Combine results | |
| total_pages_processed = sum(r.get("pages", 0) for r in chunk_results if r["success"]) | |
| backends_used = list(set(r.get("backend_used", backend) for r in chunk_results if r["success"])) | |
| backend_used = backends_used[0] if len(backends_used) == 1 else ",".join(backends_used) | |
| # Combine images from all chunks into a single zip (with chunk prefixes to avoid collisions) | |
| combined_zip_buffer = io.BytesIO() | |
| total_image_count = 0 | |
| with zipfile.ZipFile(combined_zip_buffer, 'w', zipfile.ZIP_DEFLATED) as combined_zf: | |
| for r in chunk_results: | |
| if r["success"] and r.get("images_zip_bytes"): | |
| chunk_zip_bytes = r["images_zip_bytes"] | |
| chunk_id = r["chunk_id"] | |
| # Extract from chunk zip and add to combined zip with chunk prefix | |
| with zipfile.ZipFile(io.BytesIO(chunk_zip_bytes), 'r') as chunk_zf: | |
| for name in chunk_zf.namelist(): | |
| prefixed_name = f"chunk_{chunk_id}/{name}" | |
| combined_zf.writestr(prefixed_name, chunk_zf.read(name)) | |
| total_image_count += 1 | |
| combined_images_zip = None | |
| if total_image_count > 0: | |
| combined_images_zip = base64.b64encode(combined_zip_buffer.getvalue()).decode("utf-8") | |
| logger.info(f"[{request_id}] Combined {total_image_count} images from all chunks into zip") | |
| if output_format == "json": | |
| # Combine JSON content (merge arrays or create array of results) | |
| combined_json = [] | |
| for r in chunk_results: | |
| if r["success"] and r.get("json_content"): | |
| jc = r["json_content"] | |
| if isinstance(jc, list): | |
| combined_json.extend(jc) | |
| else: | |
| combined_json.append(jc) | |
| if failed_chunks and not combined_json: | |
| return ParseResponse( | |
| success=False, | |
| error=f"All chunks failed: {errors}", | |
| pages_processed=0, | |
| backend_used=backend_used, | |
| ) | |
| return ParseResponse( | |
| success=True, | |
| json_content=combined_json if combined_json else None, | |
| images_zip=combined_images_zip, | |
| image_count=total_image_count, | |
| pages_processed=total_pages_processed, | |
| backend_used=backend_used, | |
| error=f"{len(failed_chunks)} chunks failed" if failed_chunks else None, | |
| ) | |
| else: | |
| # Combine markdown content | |
| combined_markdown = [] | |
| for r in chunk_results: | |
| if r["success"] and r.get("markdown"): | |
| # Add page separator for clarity | |
| if combined_markdown: | |
| combined_markdown.append(f"\n\n<!-- Chunk {r['chunk_id']} (pages {r['start_page']}-{r['end_page']}) -->\n\n") | |
| combined_markdown.append(r["markdown"]) | |
| if failed_chunks and not combined_markdown: | |
| return ParseResponse( | |
| success=False, | |
| error=f"All chunks failed: {errors}", | |
| pages_processed=0, | |
| backend_used=backend_used, | |
| ) | |
| return ParseResponse( | |
| success=True, | |
| markdown="".join(combined_markdown) if combined_markdown else None, | |
| images_zip=combined_images_zip, | |
| image_count=total_image_count, | |
| pages_processed=total_pages_processed, | |
| backend_used=backend_used, | |
| error=f"{len(failed_chunks)} chunks failed" if failed_chunks else None, | |
| ) | |
| class HealthResponse(BaseModel): | |
| """Health check response.""" | |
| status: str | |
| version: str | |
| backend: str | |
| chunk_size: int | |
| chunking_threshold: int | |
| max_workers: int | |
| class URLParseRequest(BaseModel): | |
| """Request model for URL-based parsing.""" | |
| url: str | |
| output_format: str = "markdown" | |
| lang: str = MINERU_LANG | |
| backend: Optional[str] = None # Override backend: pipeline, hybrid-auto-engine | |
| start_page: int = 0 | |
| end_page: Optional[int] = None | |
| include_images: bool = False # Include base64-encoded images in response | |
| async def health_check() -> HealthResponse: | |
| """Health check endpoint.""" | |
| return HealthResponse( | |
| status="healthy", | |
| version="1.4.0", | |
| backend=MINERU_BACKEND, | |
| chunk_size=CHUNK_SIZE, | |
| chunking_threshold=CHUNKING_THRESHOLD, | |
| max_workers=MAX_WORKERS, | |
| ) | |
| async def parse_document( | |
| file: UploadFile = File(..., description="PDF or image file to parse"), | |
| output_format: str = Form( | |
| default="markdown", description="Output format: markdown or json" | |
| ), | |
| lang: str = Form(default=MINERU_LANG, description="OCR language code"), | |
| start_page: int = Form(default=0, description="Starting page (0-indexed)"), | |
| end_page: Optional[int] = Form(default=None, description="Ending page (None=all)"), | |
| backend: Optional[str] = Form(default=None, description="Override backend: pipeline, hybrid-auto-engine"), | |
| include_images: bool = Form(default=False, description="Include base64-encoded images in response"), | |
| _token: str = Depends(verify_token), | |
| ) -> ParseResponse: | |
| """ | |
| Parse a document file (PDF or image) and return extracted content. | |
| Supports: | |
| - PDF files (.pdf) | |
| - Images (.png, .jpg, .jpeg, .tiff, .bmp) | |
| """ | |
| request_id = str(uuid4())[:8] | |
| start_time = time.time() | |
| logger.info(f"[{request_id}] {'='*50}") | |
| logger.info(f"[{request_id}] New parse request received") | |
| logger.info(f"[{request_id}] Filename: {file.filename}") | |
| logger.info(f"[{request_id}] Output format: {output_format}") | |
| logger.info(f"[{request_id}] Language: {lang}") | |
| logger.info(f"[{request_id}] Page range: {start_page} to {end_page or 'end'}") | |
| # Validate file size | |
| file.file.seek(0, 2) | |
| file_size = file.file.tell() | |
| file.file.seek(0) | |
| file_size_mb = file_size / (1024 * 1024) | |
| logger.info(f"[{request_id}] File size: {file_size_mb:.2f} MB") | |
| if file_size > MAX_FILE_SIZE_BYTES: | |
| logger.error(f"[{request_id}] File too large: {file_size_mb:.2f} MB > {MAX_FILE_SIZE_MB} MB") | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File size exceeds maximum allowed size of {MAX_FILE_SIZE_MB}MB", | |
| ) | |
| # Validate file type | |
| allowed_extensions = {".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp"} | |
| file_ext = Path(file.filename).suffix.lower() if file.filename else "" | |
| if file_ext not in allowed_extensions: | |
| logger.error(f"[{request_id}] Unsupported file type: {file_ext}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Unsupported file type. Allowed: {', '.join(allowed_extensions)}", | |
| ) | |
| logger.info(f"[{request_id}] File type: {file_ext}") | |
| # Create temp directory for processing | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info(f"[{request_id}] Created temp directory: {temp_dir}") | |
| try: | |
| # Save uploaded file (run blocking I/O in thread) | |
| input_path = Path(temp_dir) / f"input{file_ext}" | |
| await asyncio.to_thread(_save_uploaded_file, input_path, file.file) | |
| logger.info(f"[{request_id}] Saved file to: {input_path}") | |
| # Create output directory | |
| output_dir = Path(temp_dir) / "output" | |
| output_dir.mkdir(exist_ok=True) | |
| use_backend = backend if backend else MINERU_BACKEND | |
| # Check if chunking should be used (PDF only, sufficient pages) | |
| total_pages = 0 | |
| use_chunking = False | |
| if file_ext == ".pdf": | |
| total_pages = _get_pdf_page_count(input_path) | |
| logger.info(f"[{request_id}] PDF has {total_pages} pages") | |
| # Calculate effective page range | |
| effective_end = end_page if end_page is not None else total_pages - 1 | |
| effective_pages = effective_end - start_page + 1 | |
| if effective_pages > CHUNKING_THRESHOLD: | |
| use_chunking = True | |
| logger.info(f"[{request_id}] Chunking enabled: {effective_pages} pages > {CHUNKING_THRESHOLD} threshold") | |
| if use_chunking: | |
| # Process in parallel chunks | |
| parse_result = _process_chunked( | |
| input_path=input_path, | |
| base_output_dir=output_dir, | |
| backend=use_backend, | |
| lang=lang, | |
| start_page=start_page, | |
| end_page=end_page, | |
| total_pages=total_pages, | |
| request_id=request_id, | |
| output_format=output_format, | |
| include_images=include_images, | |
| ) | |
| else: | |
| # Process normally (single pass) | |
| logger.info(f"[{request_id}] Processing without chunking") | |
| proc, backend_used = _run_mineru( | |
| input_path=input_path, | |
| output_dir=output_dir, | |
| backend=use_backend, | |
| lang=lang, | |
| start_page=start_page, | |
| end_page=end_page, | |
| request_id=request_id, | |
| ) | |
| if proc.returncode != 0: | |
| logger.error(f"[{request_id}] MinerU failed with code {proc.returncode}") | |
| if proc.stderr: | |
| for line in proc.stderr.strip().split('\n'): | |
| logger.error(f"[{request_id}] [stderr] {line}") | |
| raise RuntimeError(f"MinerU failed (code {proc.returncode}): {proc.stderr}") | |
| # Read output | |
| logger.info(f"[{request_id}] Reading output files...") | |
| parse_result = _read_parse_output(output_dir, output_format, proc.stdout, proc.stderr, request_id, include_images) | |
| parse_result.backend_used = backend_used | |
| if backend_used != use_backend: | |
| logger.info(f"[{request_id}] Note: Fell back from {use_backend} to {backend_used} due to GPU memory constraints") | |
| total_duration = time.time() - start_time | |
| logger.info(f"[{request_id}] {'='*50}") | |
| logger.info(f"[{request_id}] Request completed successfully") | |
| logger.info(f"[{request_id}] Pages processed: {parse_result.pages_processed}") | |
| logger.info(f"[{request_id}] Total time: {total_duration:.2f}s") | |
| if parse_result.pages_processed > 0: | |
| logger.info(f"[{request_id}] Speed: {parse_result.pages_processed / total_duration:.2f} pages/sec") | |
| logger.info(f"[{request_id}] {'='*50}") | |
| return parse_result | |
| except Exception as e: | |
| total_duration = time.time() - start_time | |
| logger.error(f"[{request_id}] {'='*50}") | |
| logger.error(f"[{request_id}] Request failed after {total_duration:.2f}s") | |
| logger.error(f"[{request_id}] Error: {type(e).__name__}: {str(e)}") | |
| logger.error(f"[{request_id}] {'='*50}") | |
| return ParseResponse( | |
| success=False, | |
| error=f"{type(e).__name__}: {str(e)}", | |
| ) | |
| finally: | |
| # Cleanup temp directory | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| logger.info(f"[{request_id}] Cleaned up temp directory") | |
| async def parse_document_from_url( | |
| request: URLParseRequest, | |
| _token: str = Depends(verify_token), | |
| ) -> ParseResponse: | |
| """ | |
| Parse a document from a URL. | |
| Downloads the file and processes it through MinerU. | |
| """ | |
| request_id = str(uuid4())[:8] | |
| start_time = time.time() | |
| logger.info(f"[{request_id}] {'='*50}") | |
| logger.info(f"[{request_id}] New URL parse request received") | |
| logger.info(f"[{request_id}] URL: {request.url}") | |
| logger.info(f"[{request_id}] Output format: {request.output_format}") | |
| logger.info(f"[{request_id}] Language: {request.lang}") | |
| logger.info(f"[{request_id}] Page range: {request.start_page} to {request.end_page or 'end'}") | |
| # Validate URL to prevent SSRF attacks | |
| logger.info(f"[{request_id}] Validating URL...") | |
| _validate_url(request.url) | |
| logger.info(f"[{request_id}] URL validation passed") | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info(f"[{request_id}] Created temp directory: {temp_dir}") | |
| try: | |
| # Download file from URL | |
| logger.info(f"[{request_id}] Downloading file from URL...") | |
| download_start = time.time() | |
| async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: | |
| response = await client.get(request.url) | |
| response.raise_for_status() | |
| download_duration = time.time() - download_start | |
| file_size_mb = len(response.content) / (1024 * 1024) | |
| logger.info(f"[{request_id}] Download completed in {download_duration:.2f}s") | |
| logger.info(f"[{request_id}] File size: {file_size_mb:.2f} MB") | |
| # Determine file extension from URL path, Content-Type header, or default to .pdf | |
| allowed_extensions = {".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp"} | |
| url_path = Path(request.url.split("?")[0]) | |
| file_ext = url_path.suffix.lower() | |
| if file_ext not in allowed_extensions: | |
| # Try Content-Type header | |
| content_type = response.headers.get("content-type", "").lower() | |
| ct_map = { | |
| "application/pdf": ".pdf", | |
| "image/png": ".png", | |
| "image/jpeg": ".jpg", | |
| "image/tiff": ".tiff", | |
| "image/bmp": ".bmp", | |
| } | |
| file_ext = next((v for k, v in ct_map.items() if k in content_type), ".pdf") | |
| logger.info(f"[{request_id}] URL suffix not recognized, using: {file_ext} (from content-type: {content_type})") | |
| logger.info(f"[{request_id}] File type: {file_ext}") | |
| # Check file size | |
| if len(response.content) > MAX_FILE_SIZE_BYTES: | |
| logger.error(f"[{request_id}] File too large: {file_size_mb:.2f} MB > {MAX_FILE_SIZE_MB} MB") | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File size exceeds maximum allowed size of {MAX_FILE_SIZE_MB}MB", | |
| ) | |
| # Save downloaded file (run blocking I/O in thread) | |
| input_path = Path(temp_dir) / f"input{file_ext}" | |
| await asyncio.to_thread(_save_downloaded_content, input_path, response.content) | |
| logger.info(f"[{request_id}] Saved file to: {input_path}") | |
| # Create output directory | |
| output_dir = Path(temp_dir) / "output" | |
| output_dir.mkdir(exist_ok=True) | |
| use_backend = request.backend if request.backend else MINERU_BACKEND | |
| # Check if chunking should be used (PDF only, sufficient pages) | |
| total_pages = 0 | |
| use_chunking = False | |
| if file_ext == ".pdf": | |
| total_pages = _get_pdf_page_count(input_path) | |
| logger.info(f"[{request_id}] PDF has {total_pages} pages") | |
| # Calculate effective page range | |
| effective_end = request.end_page if request.end_page is not None else total_pages - 1 | |
| effective_pages = effective_end - request.start_page + 1 | |
| if effective_pages > CHUNKING_THRESHOLD: | |
| use_chunking = True | |
| logger.info(f"[{request_id}] Chunking enabled: {effective_pages} pages > {CHUNKING_THRESHOLD} threshold") | |
| if use_chunking: | |
| # Process in parallel chunks | |
| parse_result = _process_chunked( | |
| input_path=input_path, | |
| base_output_dir=output_dir, | |
| backend=use_backend, | |
| lang=request.lang, | |
| start_page=request.start_page, | |
| end_page=request.end_page, | |
| total_pages=total_pages, | |
| request_id=request_id, | |
| output_format=request.output_format, | |
| include_images=request.include_images, | |
| ) | |
| else: | |
| # Process normally (single pass) | |
| logger.info(f"[{request_id}] Processing without chunking") | |
| proc, backend_used = _run_mineru( | |
| input_path=input_path, | |
| output_dir=output_dir, | |
| backend=use_backend, | |
| lang=request.lang, | |
| start_page=request.start_page, | |
| end_page=request.end_page, | |
| request_id=request_id, | |
| ) | |
| if proc.returncode != 0: | |
| logger.error(f"[{request_id}] MinerU failed with code {proc.returncode}") | |
| if proc.stderr: | |
| for line in proc.stderr.strip().split('\n'): | |
| logger.error(f"[{request_id}] [stderr] {line}") | |
| raise RuntimeError(f"MinerU failed (code {proc.returncode}): {proc.stderr}") | |
| # Read output | |
| logger.info(f"[{request_id}] Reading output files...") | |
| parse_result = _read_parse_output(output_dir, request.output_format, proc.stdout, proc.stderr, request_id, request.include_images) | |
| parse_result.backend_used = backend_used | |
| if backend_used != use_backend: | |
| logger.info(f"[{request_id}] Note: Fell back from {use_backend} to {backend_used} due to GPU memory constraints") | |
| total_duration = time.time() - start_time | |
| logger.info(f"[{request_id}] {'='*50}") | |
| logger.info(f"[{request_id}] Request completed successfully") | |
| logger.info(f"[{request_id}] Pages processed: {parse_result.pages_processed}") | |
| logger.info(f"[{request_id}] Total time: {total_duration:.2f}s") | |
| if parse_result.pages_processed > 0: | |
| logger.info(f"[{request_id}] Speed: {parse_result.pages_processed / total_duration:.2f} pages/sec") | |
| logger.info(f"[{request_id}] {'='*50}") | |
| return parse_result | |
| except httpx.HTTPError as e: | |
| total_duration = time.time() - start_time | |
| logger.error(f"[{request_id}] Download failed after {total_duration:.2f}s: {str(e)}") | |
| return ParseResponse( | |
| success=False, | |
| error=f"Failed to download file from URL: {str(e)}", | |
| ) | |
| except Exception as e: | |
| total_duration = time.time() - start_time | |
| logger.error(f"[{request_id}] {'='*50}") | |
| logger.error(f"[{request_id}] Request failed after {total_duration:.2f}s") | |
| logger.error(f"[{request_id}] Error: {type(e).__name__}: {str(e)}") | |
| logger.error(f"[{request_id}] {'='*50}") | |
| return ParseResponse( | |
| success=False, | |
| error=str(e), | |
| ) | |
| finally: | |
| # Cleanup temp directory | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| logger.info(f"[{request_id}] Cleaned up temp directory") | |
| def _read_parse_output(output_dir: Path, output_format: str, stdout: str = "", stderr: str = "", request_id: str = "", include_images: bool = False) -> ParseResponse: | |
| """Read the parsed output from MinerU output directory.""" | |
| log_prefix = f"[{request_id}] " if request_id else "" | |
| # List all files in output directory for debugging | |
| all_files = [] | |
| for root, dirs, files in os.walk(output_dir): | |
| for f in files: | |
| all_files.append(os.path.join(root, f)) | |
| logger.info(f"{log_prefix}Output directory contents: {len(all_files)} files") | |
| for f in all_files: | |
| logger.info(f"{log_prefix} - {f}") | |
| # Find markdown files recursively in output directory | |
| md_files = list(output_dir.glob("**/*.md")) | |
| json_files_all = list(output_dir.glob("**/*.json")) | |
| logger.info(f"{log_prefix}Found {len(md_files)} markdown files, {len(json_files_all)} JSON files") | |
| if not md_files and not json_files_all: | |
| logger.error(f"{log_prefix}No output files found!") | |
| return ParseResponse( | |
| success=False, | |
| error=f"No output files found. All files: {all_files}. Stdout: {stdout[:500]}. Stderr: {stderr[:500]}", | |
| ) | |
| # Read markdown output | |
| markdown_content = None | |
| if md_files: | |
| markdown_content = md_files[0].read_text(encoding="utf-8") | |
| logger.info(f"{log_prefix}Markdown content length: {len(markdown_content)} chars") | |
| # Read JSON output (prefer non-content-list files) | |
| json_content = None | |
| main_json_files = [f for f in json_files_all if "_content_list" not in f.name] | |
| if main_json_files: | |
| try: | |
| json_content = json.loads(main_json_files[0].read_text(encoding="utf-8")) | |
| logger.info(f"{log_prefix}JSON content loaded from: {main_json_files[0].name}") | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"{log_prefix}Failed to parse JSON: {e}") | |
| # Count pages from content list if available | |
| pages_processed = 0 | |
| content_list_files = [f for f in json_files_all if "_content_list" in f.name] | |
| if content_list_files: | |
| try: | |
| content_list = json.loads( | |
| content_list_files[0].read_text(encoding="utf-8") | |
| ) | |
| if isinstance(content_list, list): | |
| pages_processed = len( | |
| set(item.get("page_idx", 0) for item in content_list) | |
| ) | |
| logger.info(f"{log_prefix}Pages processed: {pages_processed}") | |
| except (json.JSONDecodeError, KeyError) as e: | |
| logger.warning(f"{log_prefix}Failed to count pages: {e}") | |
| # Extract images from output directory (only if requested) | |
| images_zip = None | |
| image_count = 0 | |
| if include_images: | |
| images_zip, image_count = _create_images_zip_base64(output_dir) | |
| if image_count > 0: | |
| logger.info(f"{log_prefix}Extracted {image_count} images into zip") | |
| if output_format == "json" and json_content: | |
| logger.info(f"{log_prefix}Returning JSON output") | |
| return ParseResponse( | |
| success=True, | |
| json_content=json_content, | |
| images_zip=images_zip, | |
| image_count=image_count, | |
| pages_processed=pages_processed, | |
| ) | |
| elif markdown_content: | |
| logger.info(f"{log_prefix}Returning markdown output") | |
| return ParseResponse( | |
| success=True, | |
| markdown=markdown_content, | |
| images_zip=images_zip, | |
| image_count=image_count, | |
| pages_processed=pages_processed, | |
| ) | |
| else: | |
| logger.error(f"{log_prefix}No usable output generated") | |
| return ParseResponse( | |
| success=False, | |
| error=f"No output generated. MD files: {[str(f) for f in md_files]}. JSON files: {[str(f) for f in json_files_all]}. Stderr: {stderr[:500]}", | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |