| """ |
| PULSE-7B Handler Utilities |
| Ubden® Team - Performance monitoring and helper functions |
| """ |
|
|
| import time |
| import torch |
| import psutil |
| import logging |
| import os |
| import json |
| import requests |
| from typing import Dict, Any, Optional |
| from functools import wraps |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class PerformanceMonitor: |
| """Performance monitoring utilities for PULSE-7B handler""" |
| |
| def __init__(self): |
| self.metrics = { |
| 'total_requests': 0, |
| 'successful_requests': 0, |
| 'failed_requests': 0, |
| 'image_url_requests': 0, |
| 'base64_requests': 0, |
| 'text_only_requests': 0, |
| 'total_generation_time': 0.0, |
| 'total_image_processing_time': 0.0 |
| } |
| |
| def log_request(self, request_type: str, success: bool, |
| generation_time: float = 0.0, |
| image_processing_time: float = 0.0): |
| """Log request metrics""" |
| self.metrics['total_requests'] += 1 |
| |
| if success: |
| self.metrics['successful_requests'] += 1 |
| else: |
| self.metrics['failed_requests'] += 1 |
| |
| if request_type == 'image_url': |
| self.metrics['image_url_requests'] += 1 |
| elif request_type == 'base64': |
| self.metrics['base64_requests'] += 1 |
| else: |
| self.metrics['text_only_requests'] += 1 |
| |
| self.metrics['total_generation_time'] += generation_time |
| self.metrics['total_image_processing_time'] += image_processing_time |
| |
| def get_stats(self) -> Dict[str, Any]: |
| """Get current performance statistics""" |
| total_requests = self.metrics['total_requests'] |
| if total_requests == 0: |
| return self.metrics |
| |
| success_rate = (self.metrics['successful_requests'] / total_requests) * 100 |
| avg_generation_time = self.metrics['total_generation_time'] / total_requests |
| avg_image_processing_time = self.metrics['total_image_processing_time'] / max( |
| self.metrics['image_url_requests'] + self.metrics['base64_requests'], 1 |
| ) |
| |
| return { |
| **self.metrics, |
| 'success_rate_percent': round(success_rate, 2), |
| 'avg_generation_time_seconds': round(avg_generation_time, 3), |
| 'avg_image_processing_time_seconds': round(avg_image_processing_time, 3) |
| } |
| |
| def reset_stats(self): |
| """Reset all metrics""" |
| for key in self.metrics: |
| self.metrics[key] = 0 if isinstance(self.metrics[key], int) else 0.0 |
|
|
| def timing_decorator(func): |
| """Decorator to measure function execution time""" |
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| start_time = time.time() |
| try: |
| result = func(*args, **kwargs) |
| execution_time = time.time() - start_time |
| logger.info(f"{func.__name__} completed in {execution_time:.3f}s") |
| return result, execution_time |
| except Exception as e: |
| execution_time = time.time() - start_time |
| logger.error(f"{func.__name__} failed in {execution_time:.3f}s: {e}") |
| raise e |
| return wrapper |
|
|
| def get_system_info() -> Dict[str, Any]: |
| """Get current system resource information""" |
| cpu_percent = psutil.cpu_percent(interval=1) |
| memory = psutil.virtual_memory() |
| |
| system_info = { |
| 'cpu_usage_percent': cpu_percent, |
| 'memory_total_gb': round(memory.total / (1024**3), 2), |
| 'memory_used_gb': round(memory.used / (1024**3), 2), |
| 'memory_available_gb': round(memory.available / (1024**3), 2), |
| 'memory_usage_percent': memory.percent |
| } |
| |
| |
| if torch.cuda.is_available(): |
| gpu_memory = torch.cuda.memory_stats() |
| system_info.update({ |
| 'gpu_available': True, |
| 'gpu_memory_allocated_gb': round( |
| torch.cuda.memory_allocated() / (1024**3), 2 |
| ), |
| 'gpu_memory_reserved_gb': round( |
| torch.cuda.memory_reserved() / (1024**3), 2 |
| ), |
| 'gpu_device_name': torch.cuda.get_device_name(0) |
| }) |
| else: |
| system_info['gpu_available'] = False |
| |
| return system_info |
|
|
| def validate_image_input(image_input: str) -> Dict[str, Any]: |
| """Validate image input and return metadata""" |
| if not image_input or not isinstance(image_input, str): |
| return {'valid': False, 'type': None, 'error': 'Invalid input type'} |
| |
| |
| if image_input.startswith(('http://', 'https://')): |
| return { |
| 'valid': True, |
| 'type': 'url', |
| 'length': len(image_input), |
| 'domain': image_input.split('/')[2] if '/' in image_input else 'unknown' |
| } |
| |
| |
| elif image_input.startswith('data:image/') or len(image_input) > 100: |
| is_data_url = image_input.startswith('data:') |
| base64_data = image_input |
| |
| if is_data_url: |
| if 'base64,' in image_input: |
| base64_data = image_input.split('base64,')[1] |
| else: |
| return {'valid': False, 'type': 'base64', 'error': 'Invalid data URL format'} |
| |
| |
| estimated_size = len(base64_data) * 3 // 4 |
| |
| return { |
| 'valid': True, |
| 'type': 'base64', |
| 'is_data_url': is_data_url, |
| 'base64_length': len(base64_data), |
| 'estimated_size_bytes': estimated_size, |
| 'estimated_size_kb': round(estimated_size / 1024, 2) |
| } |
| |
| return {'valid': False, 'type': None, 'error': 'Unrecognized format'} |
|
|
| def sanitize_parameters(parameters: Dict[str, Any]) -> Dict[str, Any]: |
| """Sanitize and validate generation parameters""" |
| sanitized = {} |
| |
| |
| max_new_tokens = parameters.get('max_new_tokens', 512) |
| sanitized['max_new_tokens'] = max(1, min(max_new_tokens, 2048)) |
| |
| |
| temperature = parameters.get('temperature', 0.2) |
| sanitized['temperature'] = max(0.01, min(temperature, 2.0)) |
| |
| |
| top_p = parameters.get('top_p', 0.9) |
| sanitized['top_p'] = max(0.01, min(top_p, 1.0)) |
| |
| |
| repetition_penalty = parameters.get('repetition_penalty', 1.05) |
| sanitized['repetition_penalty'] = max(1.0, min(repetition_penalty, 2.0)) |
| |
| |
| stop = parameters.get('stop', ['</s>']) |
| if isinstance(stop, str): |
| stop = [stop] |
| sanitized['stop'] = stop[:5] |
| |
| |
| sanitized['return_full_text'] = bool(parameters.get('return_full_text', False)) |
| |
| |
| sanitized['do_sample'] = bool(parameters.get('do_sample', sanitized['temperature'] > 0.01)) |
| |
| return sanitized |
|
|
| def create_health_check() -> Dict[str, Any]: |
| """Create a health check response""" |
| try: |
| system_info = get_system_info() |
| |
| health_status = { |
| 'status': 'healthy', |
| 'timestamp': time.time(), |
| 'system': system_info, |
| 'model': 'PULSE-7B', |
| 'handler_version': '2.0.0', |
| 'features': [ |
| 'image_url_support', |
| 'base64_image_support', |
| 'stop_sequences', |
| 'parameter_validation', |
| 'performance_monitoring' |
| ] |
| } |
| |
| |
| if system_info['memory_usage_percent'] > 90: |
| health_status['warnings'] = ['High memory usage'] |
| |
| if system_info['cpu_usage_percent'] > 90: |
| health_status.setdefault('warnings', []).append('High CPU usage') |
| |
| return health_status |
| |
| except Exception as e: |
| return { |
| 'status': 'unhealthy', |
| 'timestamp': time.time(), |
| 'error': str(e) |
| } |
|
|
| class DeepSeekClient: |
| """DeepSeek API client for Turkish commentary""" |
| |
| def __init__(self, api_key: Optional[str] = None): |
| self.api_key = api_key or os.getenv('deep_key') or os.getenv('DEEPSEEK_API_KEY') |
| self.base_url = "https://api.deepseek.com/v1/chat/completions" |
| self.headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {self.api_key}" if self.api_key else "" |
| } |
| |
| def is_available(self) -> bool: |
| """Check if DeepSeek API is available""" |
| return bool(self.api_key) |
| |
| def get_turkish_commentary(self, english_analysis: str, timeout: int = 30) -> Dict[str, Any]: |
| """ |
| Get Turkish commentary for English medical analysis |
| |
| Args: |
| english_analysis: English medical analysis text |
| timeout: Request timeout in seconds |
| |
| Returns: |
| Dict with success status and commentary |
| """ |
| if not self.is_available(): |
| return { |
| "success": False, |
| "error": "DeepSeek API key not configured", |
| "comment_text": "" |
| } |
| |
| try: |
| |
| prompt = f"""Bu bir EKG sonucu klinik incelemesi. Aşağıdaki İngilizce medikal analizi Türkçe olarak yorumla ve hasta için anlaşılır bir dilde açıkla: |
| |
| "{english_analysis}" |
| |
| Lütfen: |
| 1. Medikal terimleri Türkçe karşılıklarıyla açıkla |
| 2. Hastanın anlayabileceği basit bir dille yaz |
| 3. Gerekirse aciliyet durumu hakkında bilgi ver |
| 4. Kısa ve net ol |
| |
| Türkçe Yorum:""" |
|
|
| payload = { |
| "model": "deepseek-chat", |
| "messages": [ |
| { |
| "role": "system", |
| "content": "Sen deneyimli bir kardiyolog doktorsun. EKG sonuçlarını Türkçe olarak hastalar için anlaşılır şekilde açıklıyorsun." |
| }, |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| "temperature": 0.3, |
| "max_tokens": 500, |
| "stream": False |
| } |
| |
| logger.info("🔄 DeepSeek API'ye Türkçe yorum için istek gönderiliyor...") |
| |
| response = requests.post( |
| self.base_url, |
| headers=self.headers, |
| json=payload, |
| timeout=timeout |
| ) |
| |
| response.raise_for_status() |
| result = response.json() |
| |
| if 'choices' in result and len(result['choices']) > 0: |
| comment_text = result['choices'][0]['message']['content'].strip() |
| |
| |
| if comment_text.startswith("Türkçe Yorum:"): |
| comment_text = comment_text[13:].strip() |
| |
| logger.info("✅ DeepSeek'ten Türkçe yorum başarıyla alındı") |
| |
| return { |
| "success": True, |
| "comment_text": comment_text, |
| "model": "deepseek-chat", |
| "tokens_used": result.get('usage', {}).get('total_tokens', 0) |
| } |
| else: |
| return { |
| "success": False, |
| "error": "DeepSeek API'den geçersiz yanıt", |
| "comment_text": "" |
| } |
| |
| except requests.exceptions.Timeout: |
| logger.error("❌ DeepSeek API timeout") |
| return { |
| "success": False, |
| "error": "DeepSeek API timeout - istek zaman aşımına uğradı", |
| "comment_text": "" |
| } |
| |
| except requests.exceptions.RequestException as e: |
| logger.error(f"❌ DeepSeek API request error: {e}") |
| return { |
| "success": False, |
| "error": f"DeepSeek API bağlantı hatası: {str(e)}", |
| "comment_text": "" |
| } |
| |
| except Exception as e: |
| logger.error(f"❌ DeepSeek API unexpected error: {e}") |
| return { |
| "success": False, |
| "error": f"DeepSeek API beklenmeyen hata: {str(e)}", |
| "comment_text": "" |
| } |
|
|
| |
| performance_monitor = PerformanceMonitor() |
| deepseek_client = DeepSeekClient() |
|
|