Spaces:
Build error
Build error
| import cv2 | |
| import json | |
| import uuid | |
| import os | |
| import logging | |
| from ultralytics import YOLO | |
| from tqdm import tqdm | |
| from storage import StorageInterface | |
| import numpy as np | |
| from typing import Tuple, List, Dict, Any | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Constants | |
| MODEL_PATHS = { | |
| "model1": "models/Intui_SDM_41.pt", | |
| "model2": "models/Intui_SDM_20.pt" # Add your second model path here | |
| } | |
| MAX_DIMENSION = 1280 | |
| CONFIDENCE_THRESHOLDS = [0.1, 0.3, 0.5, 0.7, 0.9] | |
| TEXT_COLOR = (0, 0, 255) # Red color for text | |
| BOX_COLOR = (255, 0, 0) # Red color for box (no transparency) | |
| BG_COLOR = (255, 255, 255, 0.6) # Semi-transparent white for text background | |
| THICKNESS = 1 # Thin text thickness | |
| BOX_THICKNESS = 2 # Box line thickness | |
| MIN_FONT_SCALE = 0.2 # Minimum font scale | |
| MAX_FONT_SCALE = 1.0 # Maximum font scale | |
| TEXT_PADDING = 20 # Increased padding between text elements | |
| OVERLAP_THRESHOLD = 0.3 # Threshold for detecting text overlap | |
| def preprocess_image_for_symbol_detection(image_cv: np.ndarray) -> np.ndarray: | |
| """Preprocess the image for symbol detection.""" | |
| gray = cv2.cvtColor(image_cv, cv2.COLOR_BGR2GRAY) | |
| equalized = cv2.equalizeHist(gray) | |
| filtered = cv2.bilateralFilter(equalized, 9, 75, 75) | |
| edges = cv2.Canny(filtered, 100, 200) | |
| preprocessed_image = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR) | |
| return preprocessed_image | |
| def evaluate_detections(detections_list: List[Dict[str, Any]]) -> int: | |
| """Evaluate the quality of detections.""" | |
| return len(detections_list) | |
| def resize_image_with_aspect_ratio(image_cv: np.ndarray, max_dimension: int) -> Tuple[np.ndarray, int, int]: | |
| """Resize the image while maintaining the aspect ratio.""" | |
| original_height, original_width = image_cv.shape[:2] | |
| if max(original_width, original_height) > max_dimension: | |
| scale = max_dimension / float(max(original_width, original_height)) | |
| new_width = int(original_width * scale) | |
| new_height = int(original_height * scale) | |
| image_cv = cv2.resize(image_cv, (new_width, new_height), interpolation=cv2.INTER_LINEAR) | |
| else: | |
| new_width, new_height = original_width, original_height | |
| return image_cv, new_width, new_height | |
| def merge_detections(all_detections: List[Dict]) -> List[Dict]: | |
| """ | |
| Merge detections from all models, keeping only the highest confidence detection | |
| when duplicates are found using IoU. | |
| """ | |
| if not all_detections: | |
| return [] | |
| # Sort by confidence | |
| all_detections.sort(key=lambda x: x['confidence'], reverse=True) | |
| # Keep track of which detections to keep | |
| keep = [True] * len(all_detections) | |
| def calculate_iou(box1, box2): | |
| """Calculate Intersection over Union (IoU) between two boxes.""" | |
| x1 = max(box1[0], box2[0]) | |
| y1 = max(box1[1], box2[1]) | |
| x2 = min(box1[2], box2[2]) | |
| y2 = min(box1[3], box2[3]) | |
| intersection = max(0, x2 - x1) * max(0, y2 - y1) | |
| area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) | |
| area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) | |
| union = area1 + area2 - intersection | |
| return intersection / union if union > 0 else 0 | |
| # Apply NMS and keep only highest confidence detection | |
| for i in range(len(all_detections)): | |
| if not keep[i]: | |
| continue | |
| current_box = all_detections[i]['bbox'] | |
| current_label = all_detections[i]['original_label'] | |
| for j in range(i + 1, len(all_detections)): | |
| if not keep[j]: | |
| continue | |
| # Check if same label type and high IoU | |
| if (all_detections[j]['original_label'] == current_label and | |
| calculate_iou(current_box, all_detections[j]['bbox']) > 0.5): | |
| # Since list is sorted by confidence, i will always have higher confidence than j | |
| keep[j] = False | |
| logging.info(f"Removing duplicate detection of {current_label} with lower confidence " | |
| f"({all_detections[j]['confidence']:.2f} < {all_detections[i]['confidence']:.2f})") | |
| # Add kept detections to final list | |
| merged_detections = [det for i, det in enumerate(all_detections) if keep[i]] | |
| return merged_detections | |
| def calculate_font_scale(image_width: int, bbox_width: int) -> float: | |
| """ | |
| Calculate appropriate font scale based on image and bbox dimensions. | |
| """ | |
| base_scale = 0.7 # Increased base scale for better visibility | |
| # Adjust font size based on image width and bbox width | |
| width_ratio = image_width / MAX_DIMENSION | |
| bbox_ratio = bbox_width / image_width | |
| # Calculate adaptive scale with increased multipliers | |
| adaptive_scale = base_scale * max(width_ratio, 0.5) * max(bbox_ratio * 6, 0.7) | |
| # Ensure font scale stays within reasonable bounds | |
| return min(max(adaptive_scale, MIN_FONT_SCALE), MAX_FONT_SCALE) | |
| def check_overlap(rect1, rect2): | |
| """Check if two rectangles overlap.""" | |
| x1_1, y1_1, x2_1, y2_1 = rect1 | |
| x1_2, y1_2, x2_2, y2_2 = rect2 | |
| return not (x2_1 < x1_2 or x1_1 > x2_2 or y2_1 < y1_2 or y1_1 > y2_2) | |
| def draw_annotation( | |
| image: np.ndarray, | |
| bbox: List[int], | |
| text: str, | |
| confidence: float, | |
| model_source: str, | |
| existing_annotations: List[tuple] = None | |
| ) -> None: | |
| """ | |
| Draw annotation with no background and thin fonts. | |
| """ | |
| if existing_annotations is None: | |
| existing_annotations = [] | |
| x1, y1, x2, y2 = bbox | |
| bbox_width = x2 - x1 | |
| image_width = image.shape[1] | |
| image_height = image.shape[0] | |
| # Calculate adaptive font scale | |
| font_scale = calculate_font_scale(image_width, bbox_width) | |
| # Simplify the annotation text | |
| annotation_text = f'{text}\n{confidence:.0f}%' | |
| lines = annotation_text.split('\n') | |
| # Calculate text dimensions | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| max_width = 0 | |
| total_height = 0 | |
| line_heights = [] | |
| for line in lines: | |
| (width, height), baseline = cv2.getTextSize( | |
| line, font, font_scale, THICKNESS | |
| ) | |
| max_width = max(max_width, width) | |
| line_height = height + baseline + TEXT_PADDING | |
| line_heights.append(line_height) | |
| total_height += line_height | |
| # Calculate initial text position with increased padding | |
| padding = TEXT_PADDING | |
| rect_x1 = max(0, x1 - padding) | |
| rect_x2 = min(image_width, x1 + max_width + padding * 2) | |
| # Try different positions to avoid overlap | |
| positions = [ | |
| ('top', y1 - total_height - padding), | |
| ('bottom', y2 + padding), | |
| ('top_shifted', y1 - total_height - padding * 2), | |
| ('bottom_shifted', y2 + padding * 2) | |
| ] | |
| final_position = None | |
| for pos_name, y_pos in positions: | |
| if y_pos < 0 or y_pos + total_height > image_height: | |
| continue | |
| rect = (rect_x1, y_pos, rect_x2, y_pos + total_height) | |
| overlap = False | |
| for existing_rect in existing_annotations: | |
| if check_overlap(rect, existing_rect): | |
| overlap = True | |
| break | |
| if not overlap: | |
| final_position = (pos_name, y_pos) | |
| existing_annotations.append(rect) | |
| break | |
| # If no non-overlapping position found, use side position | |
| if final_position is None: | |
| rect_x1 = max(0, x1 + bbox_width + padding) | |
| rect_x2 = min(image_width, rect_x1 + max_width + padding * 2) | |
| y_pos = y1 | |
| final_position = ('side', y_pos) | |
| rect_y1 = final_position[1] | |
| # Draw bounding box (no transparency) | |
| cv2.rectangle(image, (x1, y1), (x2, y2), BOX_COLOR, BOX_THICKNESS) | |
| # Draw text directly without background | |
| text_y = rect_y1 + line_heights[0] - padding | |
| for i, line in enumerate(lines): | |
| # Draw text with thin lines | |
| cv2.putText( | |
| image, | |
| line, | |
| (rect_x1 + padding, text_y + sum(line_heights[:i])), | |
| font, | |
| font_scale, | |
| TEXT_COLOR, | |
| THICKNESS, | |
| cv2.LINE_AA | |
| ) | |
| def run_detection_with_optimal_threshold( | |
| image_path: str, | |
| results_dir: str = "results", | |
| file_name: str = "", | |
| apply_preprocessing: bool = False, | |
| resize_image: bool = True, # Changed default to True | |
| storage: StorageInterface = None | |
| ) -> Tuple[str, str, str, List[int]]: | |
| """Run detection with multiple models and merge results.""" | |
| try: | |
| image_data = storage.load_file(image_path) | |
| nparr = np.frombuffer(image_data, np.uint8) | |
| original_image_cv = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| image_cv = original_image_cv.copy() | |
| if resize_image: | |
| logging.info("Resizing image for detection with aspect ratio...") | |
| image_cv, resized_width, resized_height = resize_image_with_aspect_ratio(image_cv, MAX_DIMENSION) | |
| else: | |
| logging.info("Skipping image resizing...") | |
| resized_height, resized_width = original_image_cv.shape[:2] | |
| if apply_preprocessing: | |
| logging.info("Preprocessing image for symbol detection...") | |
| image_cv = preprocess_image_for_symbol_detection(image_cv) | |
| else: | |
| logging.info("Skipping image preprocessing for symbol detection...") | |
| all_detections = [] | |
| # Run detection with each model | |
| for model_name, model_path in MODEL_PATHS.items(): | |
| logging.info(f"Running detection with model: {model_name}") | |
| if not model_path: | |
| logging.warning(f"No model path found for {model_name}") | |
| continue | |
| model = YOLO(model_path) | |
| best_confidence_threshold = 0.5 | |
| best_detections_list = [] | |
| best_metric = -1 | |
| for confidence_threshold in CONFIDENCE_THRESHOLDS: | |
| logging.info(f"Running detection with confidence threshold: {confidence_threshold}...") | |
| results = model.predict(source=image_cv, imgsz=MAX_DIMENSION) | |
| detections_list = [] | |
| for result in results: | |
| for box in result.boxes: | |
| confidence = float(box.conf[0]) | |
| if confidence >= confidence_threshold: | |
| x1, y1, x2, y2 = map(float, box.xyxy[0]) | |
| class_id = int(box.cls[0]) | |
| label = result.names[class_id] | |
| scale_x = original_image_cv.shape[1] / resized_width | |
| scale_y = original_image_cv.shape[0] / resized_height | |
| x1 *= scale_x | |
| x2 *= scale_x | |
| y1 *= scale_y | |
| y2 *= scale_y | |
| x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) | |
| split_label = label.split('_') | |
| if len(split_label) >= 3: | |
| category = split_label[0] | |
| type_ = split_label[1] | |
| new_label = '_'.join(split_label[2:]) | |
| elif len(split_label) == 2: | |
| category = split_label[0] | |
| type_ = split_label[1] | |
| new_label = split_label[1] | |
| elif len(split_label) == 1: | |
| category = split_label[0] | |
| type_ = "Unknown" | |
| new_label = split_label[0] | |
| else: | |
| logging.warning(f"Unexpected label format: {label}. Skipping this detection.") | |
| continue | |
| detection_id = str(uuid.uuid4()) | |
| detection_info = { | |
| "symbol_id": detection_id, | |
| "class_id": class_id, | |
| "original_label": label, | |
| "category": category, | |
| "type": type_, | |
| "label": new_label, | |
| "confidence": confidence, | |
| "bbox": [x1, y1, x2, y2], | |
| "model_source": model_name | |
| } | |
| detections_list.append(detection_info) | |
| metric = evaluate_detections(detections_list) | |
| if metric > best_metric: | |
| best_metric = metric | |
| best_confidence_threshold = confidence_threshold | |
| best_detections_list = detections_list | |
| all_detections.extend(best_detections_list) | |
| # Merge detections from both models | |
| merged_detections = merge_detections(all_detections) | |
| logging.info(f"Total detections after merging: {len(merged_detections)}") | |
| # Draw annotations on the image | |
| existing_annotations = [] | |
| for det in merged_detections: | |
| draw_annotation( | |
| original_image_cv, | |
| det["bbox"], | |
| det["original_label"], | |
| det["confidence"] * 100, | |
| det["model_source"], | |
| existing_annotations | |
| ) | |
| # Save results | |
| storage.create_directory(results_dir) | |
| file_name_without_extension = os.path.splitext(file_name)[0] | |
| # Prepare output JSON | |
| total_detected_symbols = len(merged_detections) | |
| class_counts = {} | |
| for det in merged_detections: | |
| full_label = det["original_label"] | |
| class_counts[full_label] = class_counts.get(full_label, 0) + 1 | |
| output_json = { | |
| "total_detected_symbols": total_detected_symbols, | |
| "details": class_counts, | |
| "detections": merged_detections | |
| } | |
| # Save JSON and image | |
| detection_json_path = os.path.join( | |
| results_dir, f'{file_name_without_extension}_detected_symbols.json' | |
| ) | |
| storage.save_file( | |
| detection_json_path, | |
| json.dumps(output_json, indent=4).encode('utf-8') | |
| ) | |
| # Save with maximum quality | |
| detection_image_path = os.path.join( | |
| results_dir, f'{file_name_without_extension}_detected_symbols.png' # Using PNG for transparency | |
| ) | |
| # Configure image encoding parameters for maximum quality | |
| encode_params = [ | |
| cv2.IMWRITE_PNG_COMPRESSION, 0 # No compression for PNG | |
| ] | |
| # Save as high-quality PNG to preserve transparency | |
| _, img_encoded = cv2.imencode( | |
| '.png', | |
| original_image_cv, | |
| encode_params | |
| ) | |
| storage.save_file(detection_image_path, img_encoded.tobytes()) | |
| # Calculate diagram bbox from merged detections | |
| diagram_bbox = [ | |
| min([det['bbox'][0] for det in merged_detections], default=0), | |
| min([det['bbox'][1] for det in merged_detections], default=0), | |
| max([det['bbox'][2] for det in merged_detections], default=0), | |
| max([det['bbox'][3] for det in merged_detections], default=0) | |
| ] | |
| # Scale up image if it's too small | |
| min_width = 2000 # Minimum width for good visibility | |
| if original_image_cv.shape[1] < min_width: | |
| scale_factor = min_width / original_image_cv.shape[1] | |
| new_width = min_width | |
| new_height = int(original_image_cv.shape[0] * scale_factor) | |
| original_image_cv = cv2.resize( | |
| original_image_cv, | |
| (new_width, new_height), | |
| interpolation=cv2.INTER_CUBIC | |
| ) | |
| return ( | |
| detection_image_path, | |
| detection_json_path, | |
| f"Total detections after merging: {total_detected_symbols}", | |
| diagram_bbox | |
| ) | |
| except Exception as e: | |
| logging.error(f"An error occurred: {e}") | |
| return "Error during detection", None, None, None | |
| if __name__ == "__main__": | |
| from storage import StorageFactory | |
| uploaded_file_path = "processed_pages/10219-1-DG-BC-00011.01-REV_A_page_1_text.png" | |
| results_dir = "results" | |
| apply_symbol_preprocessing = False | |
| resize_image = True | |
| storage = StorageFactory.get_storage() | |
| ( | |
| detection_image_path, | |
| detection_json_path, | |
| detection_log_message, | |
| diagram_bbox | |
| ) = run_detection_with_optimal_threshold( | |
| uploaded_file_path, | |
| results_dir=results_dir, | |
| file_name=os.path.basename(uploaded_file_path), | |
| apply_preprocessing=apply_symbol_preprocessing, | |
| resize_image=resize_image, | |
| storage=storage | |
| ) | |
| logging.info("Detection Image Path: %s", detection_image_path) | |
| logging.info("Detection JSON Path: %s", detection_json_path) | |
| logging.info("Detection Log Message: %s", detection_log_message) | |
| logging.info("Diagram BBox: %s", diagram_bbox) | |
| logging.info("Done!") |