import logging import cv2 import numpy as np from PIL import Image import json from datetime import datetime import os # Try to import AI libraries (graceful fallback if not available) try: from transformers import pipeline TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False logging.warning("Transformers not available") try: from ultralytics import YOLO YOLO_AVAILABLE = True except ImportError: YOLO_AVAILABLE = False logging.warning("YOLO not available") try: import tensorflow as tf TF_AVAILABLE = True except ImportError: TF_AVAILABLE = False logging.warning("TensorFlow not available") class WoundAnalyzer: """AI-powered wound analysis system""" def __init__(self, config): """Initialize wound analyzer with configuration""" self.config = config self.models_loaded = False self.load_models() def load_models(self): """Load AI models for wound analysis""" try: # Load models if libraries are available if TRANSFORMERS_AVAILABLE: try: # Load a general image classification model self.image_classifier = pipeline( "image-classification", model="google/vit-base-patch16-224", token=self.config.HF_TOKEN ) logging.info("✅ Image classification model loaded") except Exception as e: logging.warning(f"Could not load image classifier: {e}") self.image_classifier = None if YOLO_AVAILABLE: try: # Try to load YOLO model (will download if not present) self.yolo_model = YOLO('yolov8n.pt') logging.info("✅ YOLO model loaded") except Exception as e: logging.warning(f"Could not load YOLO model: {e}") self.yolo_model = None self.models_loaded = True logging.info("✅ Wound analyzer initialized") except Exception as e: logging.error(f"Error loading models: {e}") self.models_loaded = False def analyze_wound(self, image, questionnaire_id): """Analyze wound image and return comprehensive results""" start_time = datetime.now() try: if not image: return self._create_error_result("No image provided") # Get questionnaire data for context questionnaire_data = self._get_questionnaire_data(questionnaire_id) # Convert image to various formats for analysis cv_image = self._pil_to_cv2(image) np_image = np.array(image) # Perform basic image analysis basic_analysis = self._basic_image_analysis(cv_image, np_image) # Perform AI analysis if models are available ai_analysis = self._ai_image_analysis(image) # Use enhanced AI processor if available try: from .ai_processor import AIProcessor ai_processor = AIProcessor() # Perform visual analysis using AI processor visual_results = ai_processor.perform_visual_analysis(image) # Query clinical guidelines query = f"wound care {questionnaire_data.get('wound_location', '')} {questionnaire_data.get('diabetic_status', '')}" guideline_context = ai_processor.query_guidelines(query) # Generate comprehensive report comprehensive_report = ai_processor.generate_final_report( questionnaire_data, visual_results, guideline_context, image ) # Merge AI processor results with basic analysis ai_analysis.update({ 'visual_analysis': visual_results, 'clinical_guidelines': guideline_context, 'comprehensive_report': comprehensive_report }) logging.info("Enhanced AI analysis completed") except Exception as e: logging.warning(f"Enhanced AI processor not available: {e}") # Combine results analysis_result = self._combine_analysis_results( basic_analysis, ai_analysis, questionnaire_id ) # Calculate processing time processing_time = (datetime.now() - start_time).total_seconds() analysis_result['processing_time'] = processing_time logging.info(f"Wound analysis completed in {processing_time:.2f} seconds") return analysis_result except Exception as e: logging.error(f"Wound analysis error: {e}") return self._create_error_result(f"Analysis failed: {str(e)}") def _get_questionnaire_data(self, questionnaire_id): """Get questionnaire data for analysis context""" try: # This should connect to the database to get questionnaire data # For now, return empty dict as fallback return {} except Exception as e: logging.warning(f"Could not fetch questionnaire data: {e}") return {} def _pil_to_cv2(self, pil_image): """Convert PIL image to OpenCV format""" try: # Convert PIL to RGB if not already if pil_image.mode != 'RGB': pil_image = pil_image.convert('RGB') # Convert to numpy array and then to OpenCV format np_array = np.array(pil_image) cv_image = cv2.cvtColor(np_array, cv2.COLOR_RGB2BGR) return cv_image except Exception as e: logging.error(f"Error converting PIL to CV2: {e}") return None def _basic_image_analysis(self, cv_image, np_image): """Perform basic image analysis using OpenCV""" try: analysis = {} if cv_image is not None: # Image properties height, width = cv_image.shape[:2] analysis['dimensions'] = f"{width}x{height}" analysis['image_quality'] = self._assess_image_quality(cv_image) # Color analysis analysis['color_analysis'] = self._analyze_colors(cv_image) # Texture analysis analysis['texture_analysis'] = self._analyze_texture(cv_image) # Edge detection for wound boundaries analysis['edge_analysis'] = self._analyze_edges(cv_image) return analysis except Exception as e: logging.error(f"Basic image analysis error: {e}") return {} def _assess_image_quality(self, cv_image): """Assess image quality metrics""" try: # Calculate sharpness using Laplacian variance gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) sharpness = cv2.Laplacian(gray, cv2.CV_64F).var() # Calculate brightness brightness = np.mean(cv_image) # Calculate contrast contrast = np.std(cv_image) # Determine quality rating if sharpness > 500 and 50 < brightness < 200 and contrast > 30: quality = "Good" elif sharpness > 100 and 30 < brightness < 230 and contrast > 15: quality = "Fair" else: quality = "Poor" return { 'sharpness': float(sharpness), 'brightness': float(brightness), 'contrast': float(contrast), 'overall_quality': quality } except Exception as e: logging.error(f"Image quality assessment error: {e}") return {'overall_quality': 'Unknown'} def _analyze_colors(self, cv_image): """Analyze color properties of the wound""" try: # Convert to HSV for better color analysis hsv = cv2.cvtColor(cv_image, cv2.COLOR_BGR2HSV) # Calculate color statistics mean_hue = np.mean(hsv[:, :, 0]) mean_saturation = np.mean(hsv[:, :, 1]) mean_value = np.mean(hsv[:, :, 2]) # Detect dominant colors dominant_colors = self._get_dominant_colors(cv_image) return { 'mean_hue': float(mean_hue), 'mean_saturation': float(mean_saturation), 'mean_value': float(mean_value), 'dominant_colors': dominant_colors } except Exception as e: logging.error(f"Color analysis error: {e}") return {} def _get_dominant_colors(self, cv_image, k=3): """Get dominant colors in the image""" try: # Reshape image to be a list of pixels data = cv_image.reshape((-1, 3)) data = np.float32(data) # Apply k-means clustering criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 20, 1.0) _, labels, centers = cv2.kmeans(data, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) # Convert back to uint8 and get color names centers = np.uint8(centers) dominant_colors = [] for center in centers: color_name = self._classify_color(center) dominant_colors.append({ 'rgb': center.tolist(), 'name': color_name }) return dominant_colors except Exception as e: logging.error(f"Dominant colors error: {e}") return [] def _classify_color(self, rgb_color): """Classify RGB color into medical color categories""" r, g, b = rgb_color # Simple color classification for wound assessment if r > 150 and g < 100 and b < 100: return "Red/Inflammatory" elif r > 150 and g > 150 and b < 100: return "Yellow/Exudate" elif r < 100 and g < 100 and b < 100: return "Dark/Necrotic" elif r > 200 and g > 200 and b > 200: return "White/Pale" elif r > 100 and g > 50 and b < 100: return "Pink/Healthy" else: return "Mixed/Other" def _analyze_texture(self, cv_image): """Analyze texture properties""" try: gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) # Calculate Local Binary Pattern (simplified) texture_variance = np.var(gray) texture_mean = np.mean(gray) # Determine texture category if texture_variance > 1000: texture_type = "Rough/Irregular" elif texture_variance > 500: texture_type = "Moderate" else: texture_type = "Smooth" return { 'variance': float(texture_variance), 'mean': float(texture_mean), 'type': texture_type } except Exception as e: logging.error(f"Texture analysis error: {e}") return {} def _analyze_edges(self, cv_image): """Analyze edges for wound boundary detection""" try: gray = cv2.cvtColor(cv_image, cv2.COLOR_BGR2GRAY) # Apply Canny edge detection edges = cv2.Canny(gray, 50, 150) # Count edge pixels edge_count = np.sum(edges > 0) total_pixels = edges.shape[0] * edges.shape[1] edge_ratio = edge_count / total_pixels # Determine wound boundary clarity if edge_ratio > 0.1: boundary_clarity = "Well-defined" elif edge_ratio > 0.05: boundary_clarity = "Moderately-defined" else: boundary_clarity = "Poorly-defined" return { 'edge_count': int(edge_count), 'edge_ratio': float(edge_ratio), 'boundary_clarity': boundary_clarity } except Exception as e: logging.error(f"Edge analysis error: {e}") return {} def _ai_image_analysis(self, image): """Perform AI-based image analysis""" try: ai_results = {} # Use image classifier if available if hasattr(self, 'image_classifier') and self.image_classifier: try: classification = self.image_classifier(image) ai_results['classification'] = classification[:3] # Top 3 results except Exception as e: logging.warning(f"Image classification failed: {e}") # Use YOLO for object detection if available if hasattr(self, 'yolo_model') and self.yolo_model: try: detection_results = self.yolo_model(image) ai_results['object_detection'] = self._process_yolo_results(detection_results) except Exception as e: logging.warning(f"YOLO detection failed: {e}") return ai_results except Exception as e: logging.error(f"AI image analysis error: {e}") return {} def _process_yolo_results(self, results): """Process YOLO detection results""" try: processed_results = [] for result in results: if hasattr(result, 'boxes') and result.boxes: for box in result.boxes: processed_results.append({ 'confidence': float(box.conf.item()) if hasattr(box, 'conf') else 0.0, 'class_name': result.names.get(int(box.cls.item()), 'unknown') if hasattr(box, 'cls') else 'unknown' }) return processed_results except Exception as e: logging.error(f"YOLO results processing error: {e}") return [] def _combine_analysis_results(self, basic_analysis, ai_analysis, questionnaire_id): """Combine all analysis results into a comprehensive report""" try: # Create comprehensive analysis result result = { 'questionnaire_id': questionnaire_id, 'basic_analysis': basic_analysis, 'ai_analysis': ai_analysis, 'model_version': 'SmartHeal-v1.0' } # Generate summary result['summary'] = self._generate_summary(basic_analysis, ai_analysis) # Generate recommendations result['recommendations'] = self._generate_recommendations(basic_analysis, ai_analysis) # Calculate risk assessment result['risk_assessment'] = self._calculate_risk_assessment(basic_analysis, ai_analysis) result['risk_level'] = result['risk_assessment']['level'] result['risk_score'] = result['risk_assessment']['score'] # Determine wound type result['wound_type'] = self._determine_wound_type(basic_analysis, ai_analysis) # Extract wound dimensions result['wound_dimensions'] = basic_analysis.get('dimensions', 'Unknown') return result except Exception as e: logging.error(f"Results combination error: {e}") return self._create_error_result("Failed to combine analysis results") def _generate_summary(self, basic_analysis, ai_analysis): """Generate analysis summary""" try: summary_parts = [] # Image quality assessment if 'image_quality' in basic_analysis: quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') summary_parts.append(f"Image quality: {quality}") # Color analysis summary if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: colors = basic_analysis['color_analysis']['dominant_colors'] if colors: color_names = [color['name'] for color in colors[:2]] summary_parts.append(f"Dominant colors: {', '.join(color_names)}") # Texture summary if 'texture_analysis' in basic_analysis: texture_type = basic_analysis['texture_analysis'].get('type', 'Unknown') summary_parts.append(f"Texture: {texture_type}") # Boundary clarity if 'edge_analysis' in basic_analysis: boundary = basic_analysis['edge_analysis'].get('boundary_clarity', 'Unknown') summary_parts.append(f"Wound boundaries: {boundary}") # AI classification summary if 'classification' in ai_analysis and ai_analysis['classification']: top_class = ai_analysis['classification'][0] summary_parts.append(f"AI classification: {top_class.get('label', 'Unknown')}") summary = "Wound Analysis Summary: " + "; ".join(summary_parts) if summary_parts else "Basic wound analysis completed." return summary except Exception as e: logging.error(f"Summary generation error: {e}") return "Wound analysis completed with limited information due to processing constraints." def _generate_recommendations(self, basic_analysis, ai_analysis): """Generate treatment recommendations based on analysis""" try: recommendations = [] # Image quality recommendations if 'image_quality' in basic_analysis: quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') if quality == 'Poor': recommendations.append("Consider retaking the image with better lighting and focus for more accurate analysis.") # Color-based recommendations if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: colors = basic_analysis['color_analysis']['dominant_colors'] for color in colors: color_name = color.get('name', '') if 'Red/Inflammatory' in color_name: recommendations.append("Red coloration may indicate inflammation. Monitor for infection signs.") elif 'Yellow/Exudate' in color_name: recommendations.append("Yellow areas suggest possible exudate. Consider wound cleansing.") elif 'Dark/Necrotic' in color_name: recommendations.append("Dark areas may indicate necrotic tissue. Consult for debridement evaluation.") elif 'Pink/Healthy' in color_name: recommendations.append("Pink coloration suggests healthy granulation tissue - positive healing sign.") # Texture-based recommendations if 'texture_analysis' in basic_analysis: texture_type = basic_analysis['texture_analysis'].get('type', '') if 'Rough/Irregular' in texture_type: recommendations.append("Irregular texture may require specialized wound care approach.") # Boundary-based recommendations if 'edge_analysis' in basic_analysis: boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '') if 'Poorly-defined' in boundary: recommendations.append("Poorly defined wound edges may indicate ongoing tissue breakdown.") # General recommendations recommendations.extend([ "Continue regular wound monitoring and documentation.", "Maintain appropriate wound hygiene and dressing protocols.", "Consult healthcare provider for persistent or worsening symptoms.", "Follow established wound care guidelines for optimal healing." ]) return "; ".join(recommendations) if recommendations else "Standard wound care protocols recommended." except Exception as e: logging.error(f"Recommendations generation error: {e}") return "Consult healthcare provider for appropriate wound care recommendations." def _calculate_risk_assessment(self, basic_analysis, ai_analysis): """Calculate risk assessment based on analysis""" try: risk_score = 0 risk_factors = [] # Image quality factor if 'image_quality' in basic_analysis: quality = basic_analysis['image_quality'].get('overall_quality', 'Unknown') if quality == 'Poor': risk_score += 10 risk_factors.append("Poor image quality") # Color-based risk factors if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: colors = basic_analysis['color_analysis']['dominant_colors'] for color in colors: color_name = color.get('name', '') if 'Dark/Necrotic' in color_name: risk_score += 30 risk_factors.append("Possible necrotic tissue") elif 'Red/Inflammatory' in color_name: risk_score += 20 risk_factors.append("Signs of inflammation") elif 'Yellow/Exudate' in color_name: risk_score += 15 risk_factors.append("Possible exudate") # Texture risk factors if 'texture_analysis' in basic_analysis: texture_type = basic_analysis['texture_analysis'].get('type', '') if 'Rough/Irregular' in texture_type: risk_score += 10 risk_factors.append("Irregular texture") # Boundary risk factors if 'edge_analysis' in basic_analysis: boundary = basic_analysis['edge_analysis'].get('boundary_clarity', '') if 'Poorly-defined' in boundary: risk_score += 15 risk_factors.append("Poorly defined boundaries") # Determine risk level if risk_score >= 50: risk_level = "High" elif risk_score >= 25: risk_level = "Moderate" elif risk_score >= 10: risk_level = "Low" else: risk_level = "Minimal" return { 'score': min(risk_score, 100), # Cap at 100 'level': risk_level, 'factors': risk_factors } except Exception as e: logging.error(f"Risk assessment error: {e}") return { 'score': 0, 'level': 'Unknown', 'factors': ['Assessment error'] } def _determine_wound_type(self, basic_analysis, ai_analysis): """Determine wound type based on analysis""" try: # This is a simplified wound type determination # In a real system, this would use more sophisticated ML models wound_characteristics = [] # Color-based characteristics if 'color_analysis' in basic_analysis and 'dominant_colors' in basic_analysis['color_analysis']: colors = basic_analysis['color_analysis']['dominant_colors'] for color in colors: color_name = color.get('name', '') if 'Red/Inflammatory' in color_name: wound_characteristics.append("inflammatory") elif 'Pink/Healthy' in color_name: wound_characteristics.append("granulating") elif 'Yellow/Exudate' in color_name: wound_characteristics.append("exudative") elif 'Dark/Necrotic' in color_name: wound_characteristics.append("necrotic") # Determine primary wound type if "necrotic" in wound_characteristics: return "Necrotic wound" elif "inflammatory" in wound_characteristics and "exudative" in wound_characteristics: return "Infected/Inflammatory wound" elif "granulating" in wound_characteristics: return "Healing/Granulating wound" elif "exudative" in wound_characteristics: return "Exudative wound" else: return "Acute wound" except Exception as e: logging.error(f"Wound type determination error: {e}") return "Undetermined wound type" def _create_error_result(self, error_message): """Create error result structure""" return { 'error': True, 'summary': f"Analysis Error: {error_message}", 'recommendations': "Please ensure image quality is adequate and try again. Consult healthcare provider if issues persist.", 'risk_level': 'Unknown', 'risk_score': 0, 'wound_type': 'Unknown', 'wound_dimensions': 'Unknown', 'processing_time': 0.0, 'model_version': 'SmartHeal-v1.0' }