Spaces:
Sleeping
Sleeping
| """ | |
| Post-Disaster Feature Engineering Controller for HazardGuard System | |
| API request coordination and response formatting | |
| """ | |
| import logging | |
| import json | |
| from typing import Dict, List, Optional, Any, Tuple | |
| from datetime import datetime | |
| from services.post_disaster_feature_engineering_service import PostDisasterFeatureEngineeringService | |
| logger = logging.getLogger(__name__) | |
| class PostDisasterFeatureEngineeringController: | |
| """Controller for post-disaster feature engineering API operations""" | |
| def __init__(self): | |
| """Initialize the post-disaster feature engineering controller""" | |
| self.service = PostDisasterFeatureEngineeringService() | |
| # Standard response templates | |
| self.success_template = { | |
| 'success': True, | |
| 'message': 'Operation completed successfully', | |
| 'data': {}, | |
| 'timestamp': None, | |
| 'processing_info': {} | |
| } | |
| self.error_template = { | |
| 'success': False, | |
| 'error': 'Unknown error', | |
| 'message': 'Operation failed', | |
| 'data': None, | |
| 'timestamp': None | |
| } | |
| logger.info("PostDisasterFeatureEngineeringController initialized") | |
| def _create_response(self, success: bool = True, message: str = '', | |
| data: Any = None, error: str = '', | |
| processing_info: Optional[Dict] = None) -> Dict[str, Any]: | |
| """ | |
| Create standardized API response | |
| Args: | |
| success: Whether the operation was successful | |
| message: Success or error message | |
| data: Response data | |
| error: Error message (for failed operations) | |
| processing_info: Additional processing information | |
| Returns: | |
| Standardized response dictionary | |
| """ | |
| if success: | |
| response = self.success_template.copy() | |
| response['message'] = message or 'Operation completed successfully' | |
| response['data'] = data | |
| response['processing_info'] = processing_info or {} | |
| else: | |
| response = self.error_template.copy() | |
| response['error'] = error or 'Unknown error' | |
| response['message'] = message or 'Operation failed' | |
| response['data'] = data | |
| response['timestamp'] = datetime.now().isoformat() | |
| return response | |
| def validate_coordinates(self, request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Validate coordinates from request data | |
| Args: | |
| request_data: Request dictionary containing 'coordinates' key | |
| Returns: | |
| Validation response | |
| """ | |
| try: | |
| coordinates = request_data.get('coordinates') | |
| if not coordinates: | |
| return self._create_response( | |
| success=False, | |
| message="Coordinates validation failed", | |
| error="No coordinates provided in request", | |
| data={'required_format': '[[lat1, lon1], [lat2, lon2], ...]'} | |
| ) | |
| # Use service validation | |
| is_valid, validation_message, parsed_coordinates = self.service.validate_coordinates(coordinates) | |
| if not is_valid: | |
| return self._create_response( | |
| success=False, | |
| message="Coordinates validation failed", | |
| error=validation_message, | |
| data={'required_format': '[[lat1, lon1], [lat2, lon2], ...]'} | |
| ) | |
| return self._create_response( | |
| success=True, | |
| message="Coordinates validation successful", | |
| data={ | |
| 'coordinates': parsed_coordinates, | |
| 'count': len(parsed_coordinates), | |
| 'validation_message': validation_message | |
| }, | |
| processing_info={ | |
| 'coordinates_count': len(parsed_coordinates) | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller coordinates validation error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Coordinates validation error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def process_single_coordinate_features(self, request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process post-disaster feature engineering for a single coordinate | |
| Args: | |
| request_data: Request dictionary containing 'weather_data' and optionally 'coordinate' | |
| Returns: | |
| Feature engineering response | |
| """ | |
| try: | |
| # Extract request data | |
| weather_data = request_data.get('weather_data') | |
| coordinate = request_data.get('coordinate') | |
| global_stats = request_data.get('global_stats') | |
| if not weather_data: | |
| return self._create_response( | |
| success=False, | |
| message="Weather data required", | |
| error="No weather_data provided in request", | |
| data={'required_variables': self.service.get_input_variables()} | |
| ) | |
| # Process using service | |
| result = self.service.process_single_coordinate_features( | |
| weather_data=weather_data, | |
| coordinate=coordinate, | |
| global_stats=global_stats | |
| ) | |
| if result['success']: | |
| return self._create_response( | |
| success=True, | |
| message="Feature engineering completed successfully", | |
| data={ | |
| 'coordinate': result['coordinate'], | |
| 'features': result['features'], | |
| 'metadata': result['metadata'] | |
| }, | |
| processing_info={ | |
| 'processing_time_seconds': result['processing_time_seconds'], | |
| 'features_count': len(result['features']) if result['features'] else 0, | |
| 'days_processed': len(next(iter(result['features'].values()), [])) if result['features'] else 0 | |
| } | |
| ) | |
| else: | |
| return self._create_response( | |
| success=False, | |
| message="Feature engineering failed", | |
| error=result['error'], | |
| data={'coordinate': result['coordinate']}, | |
| processing_info={ | |
| 'processing_time_seconds': result['processing_time_seconds'] | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller single coordinate processing error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Single coordinate feature engineering error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def process_batch_features(self, request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Process post-disaster feature engineering for multiple coordinates | |
| Args: | |
| request_data: Request dictionary containing 'weather_datasets' and optionally 'coordinates' | |
| Returns: | |
| Batch feature engineering response | |
| """ | |
| try: | |
| # Extract request data | |
| weather_datasets = request_data.get('weather_datasets') | |
| coordinates = request_data.get('coordinates') | |
| if not weather_datasets: | |
| return self._create_response( | |
| success=False, | |
| message="Weather datasets required", | |
| error="No weather_datasets provided in request", | |
| data={'required_format': 'List of weather data dictionaries'} | |
| ) | |
| if not isinstance(weather_datasets, list): | |
| return self._create_response( | |
| success=False, | |
| message="Invalid weather datasets format", | |
| error="weather_datasets must be a list", | |
| data={'required_format': 'List of weather data dictionaries'} | |
| ) | |
| # Process using service | |
| result = self.service.process_batch_features( | |
| weather_datasets=weather_datasets, | |
| coordinates=coordinates | |
| ) | |
| if result['success']: | |
| return self._create_response( | |
| success=True, | |
| message=f"Batch feature engineering completed: {result['successful_coordinates']}/{result['total_coordinates']} coordinates", | |
| data={ | |
| 'results': result['results'], | |
| 'global_statistics': result['global_statistics'], | |
| 'summary': { | |
| 'total_coordinates': result['total_coordinates'], | |
| 'successful_coordinates': result['successful_coordinates'], | |
| 'failed_coordinates': result['failed_coordinates'], | |
| 'success_rate_percent': result['success_rate_percent'] | |
| } | |
| }, | |
| processing_info={ | |
| 'processing_time_seconds': result['processing_time_seconds'], | |
| 'coordinates_count': result['total_coordinates'] | |
| } | |
| ) | |
| else: | |
| return self._create_response( | |
| success=False, | |
| message="Batch feature engineering failed", | |
| error=result['error'], | |
| processing_info={ | |
| 'processing_time_seconds': result['processing_time_seconds'] | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller batch processing error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Batch feature engineering error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def export_to_csv(self, request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Export feature engineering results to CSV format | |
| Args: | |
| request_data: Request dictionary containing 'results' and optionally 'include_metadata' | |
| Returns: | |
| CSV export response | |
| """ | |
| try: | |
| results = request_data.get('results') | |
| include_metadata = request_data.get('include_metadata', True) | |
| if not results: | |
| return self._create_response( | |
| success=False, | |
| message="Results required for CSV export", | |
| error="No results provided in request" | |
| ) | |
| # Export to DataFrame | |
| df = self.service.export_to_dataframe(results, include_metadata) | |
| if df is None: | |
| return self._create_response( | |
| success=False, | |
| message="CSV export failed", | |
| error="Failed to create DataFrame from results" | |
| ) | |
| # Convert to CSV string | |
| csv_string = df.to_csv(index=False) | |
| return self._create_response( | |
| success=True, | |
| message=f"CSV export completed: {len(df)} rows, {len(df.columns)} columns", | |
| data={ | |
| 'csv_data': csv_string, | |
| 'row_count': len(df), | |
| 'column_count': len(df.columns), | |
| 'columns': df.columns.tolist() | |
| }, | |
| processing_info={ | |
| 'export_format': 'CSV', | |
| 'include_metadata': include_metadata | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller CSV export error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="CSV export error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def get_feature_info(self) -> Dict[str, Any]: | |
| """ | |
| Get information about input variables and output features | |
| Returns: | |
| Feature information response | |
| """ | |
| try: | |
| feature_descriptions = self.service.get_feature_descriptions() | |
| input_variables = self.service.get_input_variables() | |
| output_variables = self.service.get_output_variables() | |
| return self._create_response( | |
| success=True, | |
| message="Feature information retrieved successfully", | |
| data={ | |
| 'input_variables': { | |
| 'count': len(input_variables), | |
| 'variables': input_variables, | |
| 'description': 'Required weather variables for feature engineering' | |
| }, | |
| 'output_features': { | |
| 'count': len(output_variables), | |
| 'features': output_variables, | |
| 'descriptions': feature_descriptions, | |
| 'description': 'Engineered features created from weather data' | |
| }, | |
| 'processing_info': { | |
| 'days_per_coordinate': 60, | |
| 'feature_engineering_type': 'Post-disaster weather analysis' | |
| } | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller feature info error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Feature information error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def get_service_health(self) -> Dict[str, Any]: | |
| """ | |
| Get service health and performance statistics | |
| Returns: | |
| Service health response | |
| """ | |
| try: | |
| health_info = self.service.get_service_health() | |
| if health_info.get('service_status') == 'healthy': | |
| return self._create_response( | |
| success=True, | |
| message="Service is healthy", | |
| data=health_info | |
| ) | |
| else: | |
| return self._create_response( | |
| success=False, | |
| message="Service health check failed", | |
| error=health_info.get('error', 'Unknown health issue'), | |
| data=health_info | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller health check error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Health check error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def reset_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Reset service and model statistics | |
| Returns: | |
| Statistics reset response | |
| """ | |
| try: | |
| reset_result = self.service.reset_statistics() | |
| if reset_result['status'] == 'success': | |
| return self._create_response( | |
| success=True, | |
| message="Statistics reset successfully", | |
| data=reset_result | |
| ) | |
| else: | |
| return self._create_response( | |
| success=False, | |
| message="Statistics reset failed", | |
| error=reset_result['message'] | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller statistics reset error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Statistics reset error", | |
| error=f"Controller error: {str(e)}" | |
| ) | |
| def validate_weather_input(self, request_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Validate weather data input format | |
| Args: | |
| request_data: Request dictionary containing 'weather_data' | |
| Returns: | |
| Weather data validation response | |
| """ | |
| try: | |
| weather_data = request_data.get('weather_data') | |
| if not weather_data: | |
| return self._create_response( | |
| success=False, | |
| message="Weather data validation failed", | |
| error="No weather_data provided in request", | |
| data={'required_variables': self.service.get_input_variables()} | |
| ) | |
| # Use service validation | |
| is_valid, validation_message, validated_weather = self.service.validate_weather_data(weather_data) | |
| if not is_valid: | |
| return self._create_response( | |
| success=False, | |
| message="Weather data validation failed", | |
| error=validation_message, | |
| data={'required_variables': self.service.get_input_variables()} | |
| ) | |
| return self._create_response( | |
| success=True, | |
| message="Weather data validation successful", | |
| data={ | |
| 'validation_message': validation_message, | |
| 'variables_count': len(validated_weather), | |
| 'days_per_variable': len(next(iter(validated_weather.values()), [])), | |
| 'detected_variables': list(validated_weather.keys()) | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Controller weather validation error: {e}") | |
| return self._create_response( | |
| success=False, | |
| message="Weather data validation error", | |
| error=f"Controller error: {str(e)}" | |
| ) |