armeta_hackaton / pipeline.py
bekzhanK1's picture
Optimize pipeline: load models once per PDF instead of per page for 10x speed improvement
26dd2fe
#!/usr/bin/env python3
"""
Unified Pipeline for Document Processing
Runs QR code detection, signature detection, and stamp detection in sequence
and combines all results into a single JSON file.
"""
import sys
import json
import argparse
import cv2
import numpy as np
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List
# Try to import PyMuPDF for PDF processing
try:
import fitz # PyMuPDF
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
print("Warning: PyMuPDF not installed. PDF support disabled.")
print("Install with: pip install PyMuPDF")
# Add subdirectories to path for imports
sys.path.insert(0, str(Path(__file__).parent))
# Import detection functions
from qr.qr_extraction import process_image_no_save as process_qr
from signature.inference import detect_signatures
from stamp_detector.detect import detect_stamps_no_save
# Import for model loading
from ultralytics import YOLO
import os
def pdf_to_images(pdf_path: str, dpi: int = 200) -> List[np.ndarray]:
"""
Convert PDF pages to images.
Args:
pdf_path: Path to PDF file
dpi: Resolution for conversion (default: 200)
Returns:
List of images as numpy arrays (BGR format for OpenCV)
"""
if not PDF_SUPPORT:
raise ImportError("PyMuPDF is required for PDF processing. Install with: pip install PyMuPDF")
doc = fitz.open(pdf_path)
images = []
for page_num in range(len(doc)):
page = doc[page_num]
# Convert to image with specified DPI
mat = fitz.Matrix(dpi / 72, dpi / 72) # 72 is default DPI
pix = page.get_pixmap(matrix=mat)
# Convert to numpy array
img_data = pix.tobytes("ppm")
# Use cv2 to decode PPM
nparr = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is not None:
images.append(img)
doc.close()
return images
def _load_signature_model(signature_model_path: Optional[str] = None):
"""Load signature model once for reuse."""
from huggingface_hub import hf_hub_download
if signature_model_path and Path(signature_model_path).exists():
model_path = signature_model_path
else:
local_model_path = Path("yolov8s.pt")
if local_model_path.exists():
model_path = str(local_model_path)
else:
try:
hf_token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_TOKEN")
model_path = hf_hub_download(
repo_id="tech4humans/yolov8s-signature-detector",
filename="yolov8s.pt",
token=hf_token
)
except Exception as e:
raise RuntimeError(f"Failed to load signature model: {e}")
print("πŸ“₯ Loading signature model...")
model = YOLO(model_path)
print("βœ“ Signature model loaded")
return model
def _load_stamp_model(stamp_model_path: str = "stamp_detector/stamp_model.pt"):
"""Load stamp model once for reuse."""
if not Path(stamp_model_path).exists():
default_path = Path("stamp_detector/stamp_model.pt")
if default_path.exists():
stamp_model_path = str(default_path)
else:
raise FileNotFoundError(f"Stamp model not found: {stamp_model_path}")
print("πŸ“₯ Loading stamp model...")
model = YOLO(stamp_model_path)
print("βœ“ Stamp model loaded")
return model
def process_pdf_pipeline(
pdf_path: str,
output_dir: str = "pipeline_outputs",
stamp_model_path: str = "stamp_detector/stamp_model.pt",
stamp_conf: float = 0.25,
dpi: int = 200,
save_intermediate: bool = False,
signature_model_path: Optional[str] = None
) -> Dict[str, Any]:
"""
Process a PDF file by converting each page to an image and running the pipeline.
Args:
pdf_path: Path to PDF file
output_dir: Directory for output files
stamp_model_path: Path to stamp model
stamp_conf: Confidence threshold for stamp detection
dpi: DPI for PDF to image conversion
save_intermediate: Whether to save intermediate results
signature_model_path: Path to signature model (optional, will auto-download if not provided)
Returns:
Combined results dictionary for all pages
"""
pdf_path = Path(pdf_path)
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
if not pdf_path.exists():
raise FileNotFoundError(f"PDF not found: {pdf_path}")
if not PDF_SUPPORT:
raise ImportError("PyMuPDF is required for PDF processing. Install with: pip install PyMuPDF")
print(f"\n{'='*70}")
print(f"Processing PDF: {pdf_path.name}")
print(f"{'='*70}\n")
# Load models once before processing pages
print("πŸ”„ Loading models (this happens once for all pages)...")
try:
signature_model = _load_signature_model(signature_model_path)
stamp_model = _load_stamp_model(stamp_model_path)
except Exception as e:
print(f"βœ— Error loading models: {str(e)}")
raise
# Convert PDF to images
print(f"\nπŸ“„ Converting PDF pages to images (DPI: {dpi})...")
try:
page_images = pdf_to_images(str(pdf_path), dpi=dpi)
print(f"βœ“ Converted {len(page_images)} page(s) to images\n")
except Exception as e:
raise RuntimeError(f"Failed to convert PDF to images: {e}")
# Process each page
all_pages = []
temp_dir = Path(tempfile.mkdtemp())
try:
for page_num, img in enumerate(page_images, 1):
print(f"\n{'='*70}")
print(f"Processing Page {page_num}/{len(page_images)}")
print(f"{'='*70}\n")
# Save temporary image for processing
temp_img_path = temp_dir / f"page_{page_num}.jpg"
cv2.imwrite(str(temp_img_path), img)
# Process the page with pre-loaded models
try:
page_result = process_image_pipeline(
str(temp_img_path),
output_dir=output_dir,
signature_model=signature_model,
stamp_model=stamp_model,
stamp_conf=stamp_conf,
save_intermediate=save_intermediate
)
# Add page number to result
page_result["page_number"] = page_num
page_result["image"] = f"{pdf_path.stem}_page_{page_num}.jpg"
all_pages.append(page_result)
except Exception as e:
print(f"βœ— Error processing page {page_num}: {str(e)}")
all_pages.append({
"page_number": page_num,
"image": f"{pdf_path.stem}_page_{page_num}.jpg",
"error": str(e)
})
finally:
# Clean up temporary directory
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
# Create combined summary
summary = {
"total_pages": len(all_pages),
"total_qr_codes": sum(p.get("summary", {}).get("qr_codes", 0) for p in all_pages),
"total_signatures": sum(p.get("summary", {}).get("signatures", 0) for p in all_pages),
"total_stamps": sum(p.get("summary", {}).get("stamps", 0) for p in all_pages),
"total_detections": sum(p.get("summary", {}).get("total", 0) for p in all_pages)
}
result = {
"pdf": pdf_path.name,
"pdf_path": str(pdf_path),
"summary": summary,
"pages": all_pages
}
print(f"\n{'='*70}")
print("PDF PROCESSING COMPLETE")
print(f"{'='*70}")
print(f"Total Pages: {summary['total_pages']}")
print(f"QR Codes: {summary['total_qr_codes']}")
print(f"Signatures: {summary['total_signatures']}")
print(f"Stamps: {summary['total_stamps']}")
print(f"Total: {summary['total_detections']}")
print(f"{'='*70}\n")
return result
def process_image_pipeline(
image_path: str,
output_dir: str = "pipeline_outputs",
qr_model_path: Optional[str] = None,
signature_model_path: Optional[str] = None,
stamp_model_path: str = "stamp_detector/stamp_model.pt",
stamp_conf: float = 0.25,
save_intermediate: bool = False,
signature_model: Optional[Any] = None,
stamp_model: Optional[Any] = None
) -> Dict[str, Any]:
"""
Process a single image through all three detection models.
Args:
image_path: Path to input image
output_dir: Directory for output files
qr_model_path: Path to QR model (not used, kept for compatibility)
signature_model_path: Path to signature model (optional)
stamp_model_path: Path to stamp model
stamp_conf: Confidence threshold for stamp detection
save_intermediate: Whether to save intermediate results
Returns:
Combined results dictionary
"""
image_path = Path(image_path)
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
if not image_path.exists():
raise FileNotFoundError(f"Image not found: {image_path}")
print(f"\n{'='*70}")
print(f"Processing: {image_path.name}")
print(f"{'='*70}\n")
# Get image dimensions once (will be used to consolidate)
img_sample = cv2.imread(str(image_path))
if img_sample is None:
raise ValueError(f"Could not read image: {image_path}")
img_height, img_width = img_sample.shape[:2]
# Initialize result structure with consolidated image info
result = {
"image": image_path.name,
"image_dimensions": {
"width": img_width,
"height": img_height
},
"qr_codes": [],
"signatures": [],
"stamps": []
}
# Step 1: QR Code Detection
print("πŸ”· Step 1/3: QR Code Detection")
print("-" * 70)
try:
qr_result = process_qr(str(image_path))
if qr_result and qr_result.get("qr_codes", {}).get("items"):
result["qr_codes"] = qr_result["qr_codes"]["items"]
print(f"βœ“ Found {len(result['qr_codes'])} QR code(s)")
else:
print("βœ“ No QR codes detected")
except Exception as e:
print(f"βœ— Error in QR detection: {str(e)}")
result["qr_error"] = str(e)
# Step 2: Signature Detection
print(f"\nπŸ”· Step 2/3: Signature Detection")
print("-" * 70)
try:
# Use pre-loaded model if provided, otherwise load on demand
if signature_model is None:
if signature_model_path:
signature_model = _load_signature_model(signature_model_path)
else:
signature_model = _load_signature_model()
sig_result = detect_signatures(
str(image_path),
model=signature_model, # Use pre-loaded model
output_dir=None, # Don't save
signatures_dir=None, # Don't save
save_crops=False # Don't save crops
)
if sig_result and sig_result.get("signatures"):
# Clean up signature items (remove cropped_path if present, keep only essential data)
cleaned_signatures = []
for sig in sig_result["signatures"]:
cleaned_sig = {
"id": sig.get("signature_id"),
"confidence": sig.get("confidence"),
"bbox": sig.get("bbox")
}
cleaned_signatures.append(cleaned_sig)
result["signatures"] = cleaned_signatures
print(f"βœ“ Found {len(result['signatures'])} signature(s)")
else:
print("βœ“ No signatures detected")
except Exception as e:
print(f"βœ— Error in signature detection: {str(e)}")
result["signature_error"] = str(e)
# Step 3: Stamp Detection
print(f"\nπŸ”· Step 3/3: Stamp Detection")
print("-" * 70)
try:
# Use pre-loaded model if provided, otherwise load on demand
if stamp_model is None:
if not Path(stamp_model_path).exists():
raise FileNotFoundError(f"Stamp model not found: {stamp_model_path}")
stamp_model = _load_stamp_model(stamp_model_path)
stamp_result = detect_stamps_no_save(
str(image_path),
model_path=stamp_model_path,
conf=stamp_conf,
model=stamp_model # Pass pre-loaded model
)
if stamp_result and stamp_result.get("detections"):
# Clean up stamp items (keep only essential data, remove normalized bbox)
cleaned_stamps = []
for stamp in stamp_result["detections"]:
cleaned_stamp = {
"confidence": stamp.get("confidence"),
"bbox": stamp.get("bbox")
}
cleaned_stamps.append(cleaned_stamp)
result["stamps"] = cleaned_stamps
print(f"βœ“ Found {len(result['stamps'])} stamp(s)")
else:
print("βœ“ No stamps detected")
except Exception as e:
print(f"βœ— Error in stamp detection: {str(e)}")
result["stamp_error"] = str(e)
# Create summary
result["summary"] = {
"qr_codes": len(result.get("qr_codes", [])),
"signatures": len(result.get("signatures", [])),
"stamps": len(result.get("stamps", [])),
"total": len(result.get("qr_codes", [])) + len(result.get("signatures", [])) + len(result.get("stamps", []))
}
print(f"\n{'='*70}")
print("SUMMARY")
print(f"{'='*70}")
print(f"QR Codes: {result['summary']['qr_codes']}")
print(f"Signatures: {result['summary']['signatures']}")
print(f"Stamps: {result['summary']['stamps']}")
print(f"Total: {result['summary']['total']}")
print(f"{'='*70}\n")
return result
def process_folder_pipeline(
input_folder: str,
output_dir: str = "pipeline_outputs",
stamp_model_path: str = "stamp_detector/stamp_model.pt",
stamp_conf: float = 0.25,
save_intermediate: bool = False
) -> Dict[str, Any]:
"""
Process all images in a folder through the pipeline.
Args:
input_folder: Folder containing input images
output_dir: Directory for output files
stamp_model_path: Path to stamp model
stamp_conf: Confidence threshold for stamp detection
save_intermediate: Whether to save intermediate results
Returns:
Combined results for all images
"""
input_folder = Path(input_folder)
if not input_folder.exists():
raise FileNotFoundError(f"Input folder not found: {input_folder}")
# Supported image formats
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif', '.webp'}
image_files = [f for f in input_folder.iterdir()
if f.is_file() and f.suffix.lower() in image_extensions]
if not image_files:
print(f"No image files found in '{input_folder}'")
return {"images": [], "summary": {}}
print(f"\n{'='*70}")
print(f"Found {len(image_files)} image(s) to process")
print(f"{'='*70}\n")
all_results = []
for i, image_file in enumerate(image_files, 1):
print(f"\n[{i}/{len(image_files)}]")
try:
result = process_image_pipeline(
str(image_file),
output_dir=output_dir,
stamp_model_path=stamp_model_path,
stamp_conf=stamp_conf,
save_intermediate=save_intermediate
)
all_results.append(result)
except Exception as e:
print(f"βœ— Error processing {image_file.name}: {str(e)}")
all_results.append({
"image": image_file.name,
"image_path": str(image_file),
"error": str(e)
})
# Create summary
summary = {
"total_images": len(all_results),
"total_qr_codes": sum(r.get("summary", {}).get("qr_codes", 0) for r in all_results),
"total_signatures": sum(r.get("summary", {}).get("signatures", 0) for r in all_results),
"total_stamps": sum(r.get("summary", {}).get("stamps", 0) for r in all_results),
"total_detections": sum(r.get("summary", {}).get("total", 0) for r in all_results)
}
final_result = {
"summary": summary,
"images": all_results
}
# Save combined JSON
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True)
json_path = output_dir / "pipeline_results.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(final_result, f, indent=2, ensure_ascii=False)
print(f"\n{'='*70}")
print("PIPELINE COMPLETE")
print(f"{'='*70}")
print(f"Processed: {summary['total_images']} image(s)")
print(f"QR Codes: {summary['total_qr_codes']}")
print(f"Signatures: {summary['total_signatures']}")
print(f"Stamps: {summary['total_stamps']}")
print(f"Total: {summary['total_detections']}")
print(f"\nResults saved to: {json_path}")
print(f"{'='*70}\n")
return final_result
def main():
parser = argparse.ArgumentParser(
description="Unified pipeline for QR code, signature, and stamp detection"
)
parser.add_argument(
"input",
help="Input image file, PDF file, or folder containing images"
)
parser.add_argument(
"--output",
default="pipeline_outputs",
help="Output directory (default: pipeline_outputs)"
)
parser.add_argument(
"--stamp-model",
default="stamp_detector/stamp_model.pt",
help="Path to stamp model (default: stamp_detector/stamp_model.pt)"
)
parser.add_argument(
"--stamp-conf",
type=float,
default=0.25,
help="Confidence threshold for stamp detection (default: 0.25)"
)
parser.add_argument(
"--save-intermediate",
action="store_true",
help="Save intermediate results from each detection step"
)
parser.add_argument(
"--dpi",
type=int,
default=200,
help="DPI for PDF to image conversion (default: 200)"
)
args = parser.parse_args()
input_path = Path(args.input)
if input_path.is_file():
# Check if it's a PDF
if input_path.suffix.lower() == '.pdf':
if not PDF_SUPPORT:
print("Error: PyMuPDF is required for PDF processing.")
print("Install with: pip install PyMuPDF")
sys.exit(1)
# Process PDF
result = process_pdf_pipeline(
str(input_path),
output_dir=args.output,
stamp_model_path=args.stamp_model,
stamp_conf=args.stamp_conf,
dpi=args.dpi,
save_intermediate=args.save_intermediate
)
# Save JSON
output_dir = Path(args.output)
output_dir.mkdir(exist_ok=True)
json_path = output_dir / f"{input_path.stem}_pipeline_result.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"Results saved to: {json_path}")
else:
# Process single image
result = process_image_pipeline(
str(input_path),
output_dir=args.output,
stamp_model_path=args.stamp_model,
stamp_conf=args.stamp_conf,
save_intermediate=args.save_intermediate
)
# Save JSON
output_dir = Path(args.output)
output_dir.mkdir(exist_ok=True)
json_path = output_dir / f"{input_path.stem}_pipeline_result.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"Results saved to: {json_path}")
elif input_path.is_dir():
# Process folder
process_folder_pipeline(
str(input_path),
output_dir=args.output,
stamp_model_path=args.stamp_model,
stamp_conf=args.stamp_conf,
save_intermediate=args.save_intermediate
)
else:
print(f"Error: '{args.input}' is not a valid file or directory")
sys.exit(1)
if __name__ == "__main__":
main()