intelligent-pid / text_detection_combined.py
msIntui
Initial commit: Add core files for P&ID processing
9847531
import os
import json
import io
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from doctr.models import ocr_predictor
import pytesseract
import easyocr
from storage import StorageInterface
import re
import logging
from pathlib import Path
import cv2
import traceback
# Initialize models
try:
doctr_model = ocr_predictor(pretrained=True)
easyocr_reader = easyocr.Reader(['en'])
logging.info("All OCR models loaded successfully")
except Exception as e:
logging.error(f"Error loading OCR models: {e}")
# Combined patterns from all approaches
TEXT_PATTERNS = {
'Line_Number': r"(?:\d{1,5}[-](?:[A-Z]{2,4})[-]\d{1,3})",
'Equipment_Tag': r"(?:[A-Z]{1,3}[-][A-Z0-9]{1,4}[-]\d{1,3})",
'Instrument_Tag': r"(?:\d{2,3}[-][A-Z]{2,4}[-]\d{2,3})",
'Valve_Number': r"(?:[A-Z]{1,2}[-]\d{3})",
'Pipe_Size': r"(?:\d{1,2}[\"])",
'Flow_Direction': r"(?:FROM|TO)",
'Service_Description': r"(?:STEAM|WATER|AIR|GAS|DRAIN)",
'Process_Instrument': r"(?:[0-9]{2,3}(?:-[A-Z]{2,3})?-[0-9]{2,3}|[A-Z]{2,3}-[0-9]{2,3})",
'Nozzle': r"(?:N[0-9]{1,2}|MH)",
'Pipe_Connector': r"(?:[0-9]{1,5}|[A-Z]{1,2}[0-9]{2,5})"
}
def detect_text_combined(image, confidence_threshold=0.3):
"""Combine results from all three OCR approaches"""
results = []
# 1. Tesseract Detection
tesseract_results = detect_with_tesseract(image)
for result in tesseract_results:
result['source'] = 'tesseract'
results.append(result)
# 2. EasyOCR Detection
easyocr_results = detect_with_easyocr(image)
for result in easyocr_results:
result['source'] = 'easyocr'
results.append(result)
# 3. DocTR Detection
doctr_results = detect_with_doctr(image)
for result in doctr_results:
result['source'] = 'doctr'
results.append(result)
# Merge overlapping detections
merged_results = merge_overlapping_detections(results)
# Classify and filter results
classified_results = []
for result in merged_results:
if result['confidence'] >= confidence_threshold:
text_type = classify_text(result['text'])
result['text_type'] = text_type
classified_results.append(result)
return classified_results
def generate_detailed_summary(results):
"""Generate detailed detection summary"""
summary = {
'total_detections': len(results),
'by_type': {},
'by_source': {
'tesseract': {
'count': 0,
'by_type': {},
'avg_confidence': 0.0
},
'easyocr': {
'count': 0,
'by_type': {},
'avg_confidence': 0.0
},
'doctr': {
'count': 0,
'by_type': {},
'avg_confidence': 0.0
}
},
'confidence_ranges': {
'0.9-1.0': 0,
'0.8-0.9': 0,
'0.7-0.8': 0,
'0.6-0.7': 0,
'0.5-0.6': 0,
'<0.5': 0
},
'detected_items': []
}
# Initialize type counters
for pattern_type in TEXT_PATTERNS.keys():
summary['by_type'][pattern_type] = {
'count': 0,
'avg_confidence': 0.0,
'by_source': {
'tesseract': 0,
'easyocr': 0,
'doctr': 0
},
'items': []
}
# Initialize source-specific type counters
for source in summary['by_source'].keys():
summary['by_source'][source]['by_type'][pattern_type] = 0
# Process each detection
source_confidences = {'tesseract': [], 'easyocr': [], 'doctr': []}
for result in results:
# Get source and confidence
source = result['source']
conf = result['confidence']
text_type = result['text_type']
# Update source statistics
summary['by_source'][source]['count'] += 1
source_confidences[source].append(conf)
# Update confidence ranges
if conf >= 0.9: summary['confidence_ranges']['0.9-1.0'] += 1
elif conf >= 0.8: summary['confidence_ranges']['0.8-0.9'] += 1
elif conf >= 0.7: summary['confidence_ranges']['0.7-0.8'] += 1
elif conf >= 0.6: summary['confidence_ranges']['0.6-0.7'] += 1
elif conf >= 0.5: summary['confidence_ranges']['0.5-0.6'] += 1
else: summary['confidence_ranges']['<0.5'] += 1
# Update type statistics
if text_type in summary['by_type']:
type_stats = summary['by_type'][text_type]
type_stats['count'] += 1
type_stats['by_source'][source] += 1
summary['by_source'][source]['by_type'][text_type] += 1
type_stats['items'].append({
'text': result['text'],
'confidence': conf,
'source': source,
'bbox': result['bbox']
})
# Add to detected items
summary['detected_items'].append({
'text': result['text'],
'type': text_type,
'confidence': conf,
'source': source,
'bbox': result['bbox']
})
# Calculate average confidences
for source, confs in source_confidences.items():
if confs:
summary['by_source'][source]['avg_confidence'] = sum(confs) / len(confs)
# Calculate average confidences for each type
for text_type, stats in summary['by_type'].items():
if stats['items']:
stats['avg_confidence'] = sum(item['confidence'] for item in stats['items']) / len(stats['items'])
return summary
def process_drawing(image_path, results_dir, storage=None):
try:
# Read image using cv2
image = cv2.imread(image_path)
if image is None:
raise ValueError(f"Could not read image from {image_path}")
# Create annotated copy
annotated_image = image.copy()
# Initialize results and summary
text_results = {
'file_name': image_path,
'detections': []
}
text_summary = {
'total_detections': 0,
'by_source': {
'tesseract': {'count': 0, 'avg_confidence': 0.0},
'easyocr': {'count': 0, 'avg_confidence': 0.0},
'doctr': {'count': 0, 'avg_confidence': 0.0}
},
'by_type': {
'equipment_tag': {'count': 0, 'avg_confidence': 0.0},
'line_number': {'count': 0, 'avg_confidence': 0.0},
'instrument_tag': {'count': 0, 'avg_confidence': 0.0},
'valve_number': {'count': 0, 'avg_confidence': 0.0},
'pipe_size': {'count': 0, 'avg_confidence': 0.0},
'flow_direction': {'count': 0, 'avg_confidence': 0.0},
'service_description': {'count': 0, 'avg_confidence': 0.0},
'process_instrument': {'count': 0, 'avg_confidence': 0.0},
'nozzle': {'count': 0, 'avg_confidence': 0.0},
'pipe_connector': {'count': 0, 'avg_confidence': 0.0},
'other': {'count': 0, 'avg_confidence': 0.0}
}
}
# Run OCR with different engines
tesseract_results = detect_with_tesseract(image)
easyocr_results = detect_with_easyocr(image)
doctr_results = detect_with_doctr(image)
# Combine results
all_detections = []
all_detections.extend([(res, 'tesseract') for res in tesseract_results])
all_detections.extend([(res, 'easyocr') for res in easyocr_results])
all_detections.extend([(res, 'doctr') for res in doctr_results])
# Process each detection
for detection, source in all_detections:
# Update text_results
text_results['detections'].append({
'text': detection['text'],
'bbox': detection['bbox'],
'confidence': detection['confidence'],
'source': source
})
# Update summary statistics
text_summary['total_detections'] += 1
text_summary['by_source'][source]['count'] += 1
text_summary['by_source'][source]['avg_confidence'] += detection['confidence']
# Draw detection on image
x1, y1, x2, y2 = detection['bbox']
cv2.rectangle(annotated_image, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
cv2.putText(annotated_image, detection['text'], (int(x1), int(y1)-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Calculate average confidences
for source in text_summary['by_source']:
if text_summary['by_source'][source]['count'] > 0:
text_summary['by_source'][source]['avg_confidence'] /= text_summary['by_source'][source]['count']
# Save results with new naming convention
base_name = Path(image_path).stem
text_result_image_path = os.path.join(results_dir, f"{base_name}_detected_texts.jpg")
text_result_json_path = os.path.join(results_dir, f"{base_name}_detected_texts.json")
# Save the annotated image
success = cv2.imwrite(text_result_image_path, annotated_image)
if not success:
raise ValueError(f"Failed to save image to {text_result_image_path}")
# Save the JSON results
with open(text_result_json_path, 'w', encoding='utf-8') as f:
json.dump({
'file_name': image_path,
'summary': text_summary,
'detections': text_results['detections']
}, f, indent=4, ensure_ascii=False)
return {
'image_path': text_result_image_path,
'json_path': text_result_json_path,
'results': text_results
}, text_summary
except Exception as e:
print(f"Error in process_drawing: {str(e)}")
traceback.print_exc()
return None, None
def detect_with_tesseract(image):
"""Detect text using Tesseract OCR"""
# Configure Tesseract for technical drawings
custom_config = r'--oem 3 --psm 11 -c tessedit_char_whitelist="ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-.()" -c tessedit_write_images=true -c textord_heavy_nr=true -c textord_min_linesize=3'
try:
data = pytesseract.image_to_data(
image,
config=custom_config,
output_type=pytesseract.Output.DICT
)
results = []
for i in range(len(data['text'])):
conf = float(data['conf'][i])
if conf > 30: # Lower confidence threshold for technical text
text = data['text'][i].strip()
if text:
x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
results.append({
'text': text,
'bbox': [x, y, x + w, y + h],
'confidence': conf / 100.0
})
return results
except Exception as e:
logger.error(f"Tesseract error: {str(e)}")
return []
def detect_with_easyocr(image):
"""Detect text using EasyOCR"""
if easyocr_reader is None:
return []
try:
results = easyocr_reader.readtext(
np.array(image),
paragraph=False,
height_ths=2.0,
width_ths=2.0,
contrast_ths=0.2,
text_threshold=0.5
)
parsed_results = []
for bbox, text, conf in results:
x1, y1 = min(point[0] for point in bbox), min(point[1] for point in bbox)
x2, y2 = max(point[0] for point in bbox), max(point[1] for point in bbox)
parsed_results.append({
'text': text,
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'confidence': conf
})
return parsed_results
except Exception as e:
logger.error(f"EasyOCR error: {str(e)}")
return []
def detect_with_doctr(image):
"""Detect text using DocTR"""
try:
# Convert PIL image to numpy array
image_np = np.array(image)
# Get predictions
result = doctr_model([image_np])
doc = result.export()
# Parse results
results = []
for page in doc['pages']:
for block in page['blocks']:
for line in block['lines']:
for word in line['words']:
# Convert normalized coordinates to absolute
height, width = image_np.shape[:2]
points = np.array(word['geometry']) * np.array([width, height])
x1, y1 = points.min(axis=0)
x2, y2 = points.max(axis=0)
results.append({
'text': word['value'],
'bbox': [int(x1), int(y1), int(x2), int(y2)],
'confidence': word.get('confidence', 0.5)
})
return results
except Exception as e:
logger.error(f"DocTR error: {str(e)}")
return []
def merge_overlapping_detections(results, iou_threshold=0.5):
"""Merge overlapping detections from different sources"""
if not results:
return []
def calculate_iou(box1, box2):
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
if x2 < x1 or y2 < y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0
merged = []
used = set()
for i, r1 in enumerate(results):
if i in used:
continue
current_group = [r1]
used.add(i)
for j, r2 in enumerate(results):
if j in used:
continue
if calculate_iou(r1['bbox'], r2['bbox']) > iou_threshold:
current_group.append(r2)
used.add(j)
if len(current_group) == 1:
merged.append(current_group[0])
else:
# Keep the detection with highest confidence
best_detection = max(current_group, key=lambda x: x['confidence'])
merged.append(best_detection)
return merged
def classify_text(text):
"""Classify text based on patterns"""
if not text:
return 'Unknown'
# Clean and normalize text
text = text.strip().upper()
text = re.sub(r'\s+', '', text)
for text_type, pattern in TEXT_PATTERNS.items():
if re.match(pattern, text):
return text_type
return 'Unknown'
def annotate_image(image, results):
"""Create annotated image with detections"""
# Convert image to RGB mode to ensure color support
if image.mode != 'RGB':
image = image.convert('RGB')
# Create drawing object
draw = ImageDraw.Draw(image)
try:
font = ImageFont.truetype("arial.ttf", 20)
except IOError:
font = ImageFont.load_default()
# Define colors for different text types
colors = {
'Line_Number': "#FF0000", # Bright Red
'Equipment_Tag': "#00FF00", # Bright Green
'Instrument_Tag': "#0000FF", # Bright Blue
'Valve_Number': "#FFA500", # Bright Orange
'Pipe_Size': "#FF00FF", # Bright Magenta
'Process_Instrument': "#00FFFF", # Bright Cyan
'Nozzle': "#FFFF00", # Yellow
'Pipe_Connector': "#800080", # Purple
'Unknown': "#FF4444" # Light Red
}
# Draw detections
for result in results:
text_type = result.get('text_type', 'Unknown')
color = colors.get(text_type, colors['Unknown'])
# Draw bounding box
draw.rectangle(result['bbox'], outline=color, width=3)
# Create label
label = f"{result['text']} ({result['confidence']:.2f})"
if text_type != 'Unknown':
label += f" [{text_type}]"
# Draw label background
text_bbox = draw.textbbox((result['bbox'][0], result['bbox'][1] - 20), label, font=font)
draw.rectangle(text_bbox, fill="#FFFFFF")
# Draw label text
draw.text((result['bbox'][0], result['bbox'][1] - 20), label, fill=color, font=font)
return image
def save_annotated_image(image, path, storage):
"""Save annotated image with maximum quality"""
image_byte_array = io.BytesIO()
image.save(
image_byte_array,
format='PNG',
optimize=False,
compress_level=0
)
storage.save_file(path, image_byte_array.getvalue())
if __name__ == "__main__":
from storage import StorageFactory
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize storage
storage = StorageFactory.get_storage()
# Test file paths
file_path = "processed_pages/10219-1-DG-BC-00011.01-REV_A_page_1_text.png"
result_path = "results"
try:
# Ensure result directory exists
os.makedirs(result_path, exist_ok=True)
# Process the drawing
logger.info(f"Processing file: {file_path}")
results, summary = process_drawing(file_path, result_path, storage)
# Print detailed results
print("\n=== DETAILED DETECTION RESULTS ===")
print(f"\nTotal Detections: {summary['total_detections']}")
print("\nBreakdown by Text Type:")
print("-" * 50)
for text_type, stats in summary['by_type'].items():
if stats['count'] > 0:
print(f"\n{text_type}:")
print(f" Count: {stats['count']}")
print(f" Average Confidence: {stats['avg_confidence']:.2f}")
print(" Items:")
for item in stats['items']:
print(f" - {item['text']} (conf: {item['confidence']:.2f}, source: {item['source']})")
print("\nBreakdown by OCR Engine:")
print("-" * 50)
for source, count in summary['by_source'].items():
print(f"{source}: {count} detections")
print("\nConfidence Distribution:")
print("-" * 50)
for range_name, count in summary['confidence_ranges'].items():
print(f"{range_name}: {count} detections")
# Print output paths
print("\nOutput Files:")
print("-" * 50)
print(f"Annotated Image: {results['image_path']}")
print(f"JSON Results: {results['json_path']}")
except Exception as e:
logger.error(f"Error processing file: {e}")
raise