intelligent-pid / pdf_processor.py
msIntui
Initial commit: Add core files for P&ID processing
9847531
import fitz # PyMuPDF
import os
import logging
from pathlib import Path
import numpy as np
from PIL import Image
import io
import cv2 # Add this import
from storage import StorageInterface
from typing import List, Dict, Tuple, Any
import json
from text_detection_combined import process_drawing
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self, storage: StorageInterface):
self.storage = storage
self.logger = logging.getLogger(__name__)
# Configure optimal processing parameters
self.target_dpi = 600 # Increased from 300 to 600 DPI
self.min_dimension = 2000 # Minimum width/height
self.max_dimension = 8000 # Increased max dimension for higher DPI
self.quality = 95 # JPEG quality for saving
def process_document(self, file_path: str, output_dir: str) -> list:
"""Process document (PDF/PNG/JPG) and return paths to processed pages"""
file_ext = Path(file_path).suffix.lower()
if file_ext == '.pdf':
return self._process_pdf(file_path, output_dir)
elif file_ext in ['.png', '.jpg', '.jpeg']:
return self._process_image(file_path, output_dir)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def _process_pdf(self, pdf_path: str, output_dir: str) -> list:
"""Process PDF document"""
processed_pages = []
processing_results = {}
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Clean up any existing files for this document
base_name = Path(pdf_path).stem
for file in os.listdir(output_dir):
if file.startswith(base_name) and file != os.path.basename(pdf_path):
file_path = os.path.join(output_dir, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
self.logger.error(f"Error deleting file {file_path}: {e}")
# Read PDF file directly since it's already in the results directory
with open(pdf_path, 'rb') as f:
pdf_data = f.read()
doc = fitz.open(stream=pdf_data, filetype="pdf")
for page_num in range(len(doc)):
page = doc[page_num]
# Calculate zoom factor for 600 DPI
zoom = self.target_dpi / 72
matrix = fitz.Matrix(zoom, zoom)
# Get high-resolution image
pix = page.get_pixmap(matrix=matrix)
img_data = pix.tobytes()
# Convert to numpy array
nparr = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Create base filename
base_filename = f"{Path(pdf_path).stem}_page_{page_num + 1}"
# Process and save different versions
optimized_versions = {
'text': self._optimize_for_text(img.copy()),
'symbol': self._optimize_for_symbols(img.copy()),
'line': self._optimize_for_lines(img.copy())
}
paths = {
'text': os.path.join(output_dir, f"{base_filename}_text.png"),
'symbol': os.path.join(output_dir, f"{base_filename}_symbol.png"),
'line': os.path.join(output_dir, f"{base_filename}_line.png")
}
# Save each version
for version_type, optimized_img in optimized_versions.items():
self._save_image(optimized_img, paths[version_type])
processed_pages.append(paths[version_type])
# Store processing results
processing_results[str(page_num + 1)] = {
"page_number": page_num + 1,
"dimensions": {
"width": img.shape[1],
"height": img.shape[0]
},
"paths": paths,
"dpi": self.target_dpi,
"zoom_factor": zoom
}
# Save processing results JSON
results_json_path = os.path.join(
output_dir,
f"{Path(pdf_path).stem}_processing_results.json"
)
with open(results_json_path, 'w') as f:
json.dump(processing_results, f, indent=4)
return processed_pages
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
raise
def _process_image(self, image_path: str, output_dir: str) -> list:
"""Process single image file"""
try:
# Load image
image_data = self.storage.load_file(image_path)
nparr = np.frombuffer(image_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Process the image
processed_img = self._optimize_image(img)
# Save processed image
output_path = os.path.join(
output_dir,
f"{Path(image_path).stem}_text.png"
)
self._save_image(processed_img, output_path)
return [output_path]
except Exception as e:
self.logger.error(f"Error processing image: {str(e)}")
raise
def _optimize_image(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for best detection results"""
# Convert to grayscale for processing
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Enhance contrast
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Denoise
denoised = cv2.fastNlMeansDenoising(enhanced)
# Binarize
_, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Resize while maintaining aspect ratio
height, width = binary.shape
scale = min(self.max_dimension / max(width, height),
max(self.min_dimension / min(width, height), 1.0))
if scale != 1.0:
new_width = int(width * scale)
new_height = int(height * scale)
resized = cv2.resize(binary, (new_width, new_height),
interpolation=cv2.INTER_LANCZOS4)
else:
resized = binary
# Convert back to BGR for compatibility
return cv2.cvtColor(resized, cv2.COLOR_GRAY2BGR)
def _optimize_for_text(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for text detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Enhance contrast using CLAHE
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Denoise
denoised = cv2.fastNlMeansDenoising(enhanced)
# Adaptive thresholding for better text separation
binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# Convert back to BGR
return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
def _optimize_for_symbols(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for symbol detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Bilateral filter to preserve edges while reducing noise
bilateral = cv2.bilateralFilter(gray, 9, 75, 75)
# Enhance contrast
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(bilateral)
# Sharpen image
kernel = np.array([[-1,-1,-1],
[-1, 9,-1],
[-1,-1,-1]])
sharpened = cv2.filter2D(enhanced, -1, kernel)
# Convert back to BGR
return cv2.cvtColor(sharpened, cv2.COLOR_GRAY2BGR)
def _optimize_for_lines(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for line detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Reduce noise while preserving edges
denoised = cv2.GaussianBlur(gray, (3,3), 0)
# Edge enhancement
edges = cv2.Canny(denoised, 50, 150)
# Dilate edges to connect broken lines
kernel = np.ones((2,2), np.uint8)
dilated = cv2.dilate(edges, kernel, iterations=1)
# Convert back to BGR
return cv2.cvtColor(dilated, cv2.COLOR_GRAY2BGR)
def _save_image(self, img: np.ndarray, output_path: str):
"""Save processed image with optimal quality"""
# Encode image with high quality
_, buffer = cv2.imencode('.png', img, [
cv2.IMWRITE_PNG_COMPRESSION, 0
])
# Save to storage
self.storage.save_file(output_path, buffer.tobytes())
if __name__ == "__main__":
from storage import StorageFactory
import shutil
# Initialize storage and processor
storage = StorageFactory.get_storage()
processor = DocumentProcessor(storage)
# Process PDF
pdf_path = "samples/001.pdf"
output_dir = "results" # Changed from "processed_pages" to "results"
try:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
results = processor.process_document(
file_path=pdf_path,
output_dir=output_dir
)
# Print detailed results
print("\nProcessing Results:")
print(f"Output Directory: {os.path.abspath(output_dir)}")
for page_path in results:
abs_path = os.path.abspath(page_path)
file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB
print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)")
# Calculate total size of output
total_size = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)) / (1024 * 1024)
print(f"\nTotal output size: {total_size:.2f} MB")
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise