Spaces:
Sleeping
Sleeping
| # vulnerability.py | |
| import numpy as np | |
| def normalize_component(value, max_value, inverse=False): | |
| """ | |
| Normalize to 0-1 range | |
| """ | |
| if value is None: | |
| return 0.5 | |
| if inverse: | |
| normalized = min(1.0, abs(value) / max_value) | |
| else: | |
| normalized = max(0.0, 1.0 - (abs(value) / max_value)) | |
| return normalized | |
| def assess_flood_context(elevation, tpi, water_distance): | |
| # Context 1: Coastal (<10m) | |
| if elevation < 10: | |
| if water_distance is not None and water_distance < 500: | |
| return 'very_high', 1.0 | |
| elif water_distance is not None and water_distance < 2000: | |
| return 'very_high' if tpi < -3 else 'very high', 1.0 if tpi < -3 else 0.98 | |
| elif water_distance is not None and water_distance < 5000: | |
| return 'high' if tpi < -3 else 'moderate', 0.9 if tpi < -3 else 0.75 | |
| else: | |
| return 'moderate', 0.7 if tpi < -5 else 0.6 | |
| # Context 2: High plateau (>600m) | |
| elif elevation > 600: | |
| if tpi < -15 and water_distance is not None and water_distance < 100: | |
| return 'moderate', 0.65 | |
| elif tpi < -10: | |
| return 'low', 0.55 | |
| else: | |
| return 'low', 0.50 | |
| # Context 3: Mountain (300β600m) | |
| elif elevation > 300: | |
| if water_distance is not None and water_distance < 200 and tpi < -10: | |
| return 'moderate', 0.75 | |
| elif water_distance is not None and water_distance < 500: | |
| return 'low', 0.65 | |
| else: | |
| return 'low', 0.55 | |
| # Context 4: River valley (100β300m) | |
| elif 100 < elevation < 300: | |
| if water_distance is not None and water_distance < 300 and tpi < -5: | |
| return 'high', 1.0 | |
| elif water_distance is not None and water_distance < 500: | |
| return 'moderate', 0.85 | |
| else: | |
| return 'moderate', 0.7 | |
| # Context 5: Low inland (10β100m) | |
| else: | |
| if water_distance is None: | |
| return 'moderate', 0.7 | |
| elif water_distance < 200: | |
| if tpi < -8: | |
| return 'very_high', 1.0 | |
| elif tpi < -5: | |
| return 'high', 0.95 | |
| else: | |
| return 'high', 0.85 | |
| elif water_distance < 500: | |
| return 'high' if tpi < -5 else 'moderate', 0.85 if tpi < -5 else 0.75 | |
| elif water_distance < 1000: | |
| return 'moderate', 0.70 if tpi < -5 else 0.65 | |
| else: | |
| if tpi < -8: | |
| return 'moderate', 0.65 | |
| elif tpi < -5: | |
| return 'low', 0.60 | |
| else: | |
| return 'low', 0.55 | |
| def calculate_vulnerability_index(lat, lon, height, basement, terrain_metrics, water_distance): | |
| """ | |
| Calculate flood vulnerability index with basement consideration | |
| """ | |
| elevation = terrain_metrics.get('elevation') or 0 | |
| tpi = terrain_metrics.get('tpi') or 0 | |
| slope = terrain_metrics.get('slope') or 0 | |
| # GET FLOOD CONTEXT | |
| try: | |
| context_risk_level, context_factor = assess_flood_context(elevation, tpi, water_distance) | |
| except (TypeError, ValueError) as te: | |
| print(f"Context failed for {lat},{lon}: {te} - default moderate") | |
| context_risk_level, context_factor = 'moderate', 0.8 | |
| # Apply elevation penalty for high-altitude locations | |
| if elevation > 500: | |
| elevation_factor = max(0.3, 1.0 - (elevation - 500) / 1000) | |
| else: | |
| elevation_factor = 1.0 | |
| # Component 1: Proximity | |
| if water_distance is None: | |
| proximity_score = 0.5 | |
| elif water_distance < 100: | |
| proximity_score = 1.0 * elevation_factor | |
| elif water_distance < 500: | |
| proximity_score = (0.9 - ((water_distance - 100) / 400) * 0.5) * elevation_factor | |
| elif water_distance < 2000: | |
| proximity_score = (0.4 - ((water_distance - 500) / 1500) * 0.3) * elevation_factor | |
| elif water_distance < 5000: | |
| proximity_score = max(0.0, 0.1 - ((water_distance - 2000) / 3000) * 0.1) * elevation_factor | |
| else: | |
| proximity_score = 0.001 | |
| # Component 2: TPI (Topographic Position Index) | |
| if tpi is not None: | |
| if tpi < -5: | |
| tpi_score = min(1.0, 0.7 + abs(tpi + 5) / 30) | |
| elif tpi > 5: | |
| tpi_score = max(0.0, 0.3 - (tpi - 5) / 50) | |
| else: | |
| tpi_score = 0.5 - (tpi / 20) | |
| else: | |
| tpi_score = 0.5 | |
| tpi_score = max(0.0, min(1.0, tpi_score)) | |
| if elevation > 500: | |
| tpi_score = tpi_score * elevation_factor | |
| # Component 3: Slope | |
| if slope < 0.5: | |
| slope_score = 0.9 | |
| elif slope < 2: | |
| slope_score = 0.8 - ((slope - 0.5) / 1.5) * 0.3 | |
| elif slope < 6: | |
| slope_score = 0.5 - ((slope - 2) / 4) * 0.3 | |
| else: | |
| slope_score = max(0.05, 0.2 - (slope - 6) / 20) | |
| # Component 4: Building protection factor | |
| net_protection = height + abs(basement) | |
| # Height protection calculation (without basement penalty) | |
| if net_protection <= 0: | |
| height_score = 0.9 | |
| elif net_protection < 3: | |
| height_score = 0.8 - (net_protection / 3) * 0.3 | |
| elif net_protection < 8: | |
| height_score = 0.5 - ((net_protection - 3) / 5) * 0.3 | |
| else: | |
| height_score = max(0.1, 0.2 - ((net_protection - 8) / 15) * 0.15) | |
| height_score = max(0.0, min(1.0, height_score)) | |
| # Increase weight for building characteristics when basement present | |
| if basement < 0: | |
| weights = { | |
| 'proximity': 0.25, | |
| 'tpi': 0.30, | |
| 'slope': 0.15, | |
| 'height': 0.30 | |
| } | |
| else: | |
| weights = { | |
| 'proximity': 0.30, | |
| 'tpi': 0.35, | |
| 'slope': 0.20, | |
| 'height': 0.15 | |
| } | |
| # Base vulnerability | |
| base_vulnerability = ( | |
| weights['proximity'] * proximity_score + | |
| weights['tpi'] * tpi_score + | |
| weights['slope'] * slope_score + | |
| weights['height'] * height_score | |
| ) | |
| # Basement as multiplier | |
| if basement < 0: | |
| basement_multiplier = 1.0 + (abs(basement) * 0.15) | |
| base_vulnerability = min(1.0, base_vulnerability * basement_multiplier) | |
| # Apply context adjustment | |
| vulnerability_index = base_vulnerability * context_factor | |
| # Risk level based on final vulnerability_index with threshold mapping | |
| if vulnerability_index >= 0.80: | |
| final_risk = 'very_high' | |
| elif vulnerability_index >= 0.65: | |
| final_risk = 'high' | |
| elif vulnerability_index >= 0.40: | |
| final_risk = 'moderate' | |
| elif vulnerability_index >= 0.20: | |
| final_risk = 'low' | |
| else: | |
| final_risk = 'very_low' | |
| # Keep context-based label if more severe | |
| risk_levels_order = ['very_low', 'low', 'moderate', 'high', 'very_high'] | |
| context_severity = risk_levels_order.index(context_risk_level) if context_risk_level in risk_levels_order else 2 | |
| final_severity = risk_levels_order.index(final_risk) | |
| risk_level = risk_levels_order[max(context_severity, final_severity)] | |
| # Track component scores for SHAP | |
| components = { | |
| 'proximity_score': proximity_score, | |
| 'tpi_score': tpi_score, | |
| 'slope_score': slope_score, | |
| 'height_score': height_score, | |
| 'elevation': elevation | |
| } | |
| # Calculate uncertainty | |
| uncertainty_analysis = calculate_uncertainty( | |
| terrain_metrics, | |
| water_distance, | |
| context_factor, | |
| lat, | |
| lon | |
| ) | |
| # Calculate confidence interval | |
| confidence_interval = calculate_confidence_interval( | |
| vulnerability_index, | |
| uncertainty_analysis['uncertainty'] | |
| ) | |
| return { | |
| 'vulnerability_index': round(vulnerability_index, 3), | |
| 'confidence_interval': confidence_interval, | |
| 'risk_level': risk_level, | |
| 'distance_to_water_m': round(water_distance, 1) if water_distance else None, | |
| 'elevation_m': elevation, | |
| 'relative_elevation_m': round(tpi, 2) if tpi is not None else None, | |
| 'slope_degrees': round(slope, 2) if slope is not None else None, | |
| 'uncertainty_analysis': uncertainty_analysis, | |
| 'components': components | |
| } | |
| def calculate_uncertainty(terrain_metrics, water_distance, context_factor, lat, lon): | |
| """ | |
| Physically-based uncertainty quantification - FIXED scaling | |
| """ | |
| uncertainties = {} | |
| # 1. ELEVATION UNCERTAINTY | |
| elevation = terrain_metrics.get('elevation') | |
| slope = terrain_metrics.get('slope') or 0 | |
| if elevation is None: | |
| uncertainties['elevation'] = 0.15 | |
| else: | |
| # Base DEM error in meters | |
| if abs(lat) < 60: | |
| base_error_m = 2.5 | |
| else: | |
| base_error_m = 4.0 | |
| # Slope increases error | |
| if slope > 15: | |
| slope_multiplier = 1 + (slope - 15) / 30 | |
| base_error_m *= slope_multiplier | |
| # Convert to normalized uncertainty | |
| if elevation < 10: | |
| uncertainties['elevation'] = 0.08 # coastal - elevation matters a lot | |
| elif elevation < 100: | |
| uncertainties['elevation'] = 0.06 # low inland | |
| else: | |
| uncertainties['elevation'] = 0.03 # elevated - less critical | |
| # 2. TPI UNCERTAINTY | |
| tpi = terrain_metrics.get('tpi') | |
| if tpi is None: | |
| uncertainties['tpi'] = 0.12 | |
| else: | |
| # TPI uncertainty affects the depression detection | |
| if abs(tpi) < 2: | |
| uncertainties['tpi'] = 0.10 # near-flat, hard to classify | |
| elif abs(tpi) < 5: | |
| uncertainties['tpi'] = 0.06 | |
| else: | |
| uncertainties['tpi'] = 0.04 # clear depression/ridge | |
| # 3. SLOPE UNCERTAINTY | |
| if slope is None: | |
| uncertainties['slope'] = 0.10 | |
| else: | |
| if slope < 2: | |
| uncertainties['slope'] = 0.08 # very flat = uncertain | |
| elif slope < 10: | |
| uncertainties['slope'] = 0.04 | |
| else: | |
| uncertainties['slope'] = 0.03 # steep = clear signal | |
| # 4. WATER DISTANCE UNCERTAINTY | |
| if water_distance is None: | |
| uncertainties['water_proximity'] = 0.20 | |
| elif water_distance < 50: | |
| uncertainties['water_proximity'] = 0.03 | |
| elif water_distance < 500: | |
| uncertainties['water_proximity'] = 0.06 | |
| elif water_distance < 2000: | |
| uncertainties['water_proximity'] = 0.10 | |
| else: | |
| uncertainties['water_proximity'] = 0.15 | |
| # 5. CONTEXT UNCERTAINTY | |
| if context_factor < 0.7: | |
| uncertainties['context'] = 0.04 | |
| elif context_factor > 0.95: | |
| uncertainties['context'] = 0.06 | |
| else: | |
| uncertainties['context'] = 0.03 | |
| # 6. MODEL STRUCTURAL UNCERTAINTY | |
| uncertainties['model'] = 0.08 | |
| # Weight by component importance in vulnerability calculation | |
| weights = { | |
| 'elevation': 0.20, | |
| 'tpi': 0.30, | |
| 'slope': 0.15, | |
| 'water_proximity': 0.25, | |
| 'context': 0.05, | |
| 'model': 0.05 | |
| } | |
| # Weighted root-sum-of-squares | |
| weighted_variance = sum(weights[k] * (v ** 2) for k, v in uncertainties.items()) | |
| total_uncertainty = np.sqrt(weighted_variance) | |
| # Additional damping factor | |
| total_uncertainty *= 0.7 # empirical adjustment | |
| confidence = max(0.0, min(1.0, 1.0 - total_uncertainty)) | |
| # Get dominant error sources | |
| sorted_uncertainties = sorted(uncertainties.items(), key=lambda x: x[1], reverse=True) | |
| dominant_sources = sorted_uncertainties[:3] | |
| return { | |
| 'confidence': round(confidence, 3), | |
| 'uncertainty': round(total_uncertainty, 3), | |
| 'components': {k: round(v, 3) for k, v in uncertainties.items()}, | |
| 'interpretation': interpret_confidence(confidence), | |
| 'data_quality_flags': get_quality_flags(terrain_metrics, water_distance), | |
| 'dominant_error_sources': dominant_sources | |
| } | |
| def get_quality_flags(terrain_metrics, water_distance): | |
| """ | |
| Identify specific data quality issues | |
| """ | |
| flags = [] | |
| if terrain_metrics.get('elevation') is None: | |
| flags.append('missing_elevation') | |
| if terrain_metrics.get('tpi') is None: | |
| flags.append('missing_tpi') | |
| if terrain_metrics.get('slope') is None: | |
| flags.append('missing_slope') | |
| if water_distance is None: | |
| flags.append('water_distance_unknown') | |
| elif water_distance > 5000: | |
| flags.append('far_from_water_search_limited') | |
| elevation = terrain_metrics.get('elevation') or 0 | |
| slope = terrain_metrics.get('slope') or 0 | |
| if slope > 20: | |
| flags.append('steep_terrain_dem_error_high') | |
| if elevation < 1 and water_distance is not None and water_distance < 100: | |
| flags.append('coastal_surge_risk_not_modeled') | |
| return flags | |
| def interpret_confidence(confidence): | |
| """ | |
| Realistic confidence interpretation | |
| """ | |
| if confidence >= 0.85: | |
| return "High confidence - complete terrain data with low uncertainty" | |
| elif confidence >= 0.75: | |
| return "Good confidence - reliable data sources available" | |
| elif confidence >= 0.65: | |
| return "Moderate confidence - some data limitations present" | |
| elif confidence >= 0.50: | |
| return "Fair confidence - significant data gaps or measurement uncertainty" | |
| else: | |
| return "Low confidence - substantial missing data, use with caution" | |
| def calculate_confidence_interval(vulnerability_index, uncertainty): | |
| """ | |
| Calculate 95% confidence interval with proper bounds | |
| """ | |
| margin = 1.96 * uncertainty | |
| # Clip to valid 0-1 range | |
| lower = max(0.0, vulnerability_index - margin) | |
| upper = min(1.0, vulnerability_index + margin) | |
| return { | |
| 'point_estimate': round(vulnerability_index, 3), | |
| 'lower_bound_95': round(lower, 3), | |
| 'upper_bound_95': round(upper, 3), | |
| 'margin_of_error': round(margin, 3) | |
| } | |
| def calculate_multi_hazard_vulnerability(lat, lon, height, basement, terrain_metrics, water_distance): | |
| """ | |
| Multi-hazard assessment | |
| """ | |
| # Base assessment | |
| base_result = calculate_vulnerability_index( | |
| lat, lon, height, basement, terrain_metrics, water_distance | |
| ) | |
| elevation = terrain_metrics.get('elevation') or 0 | |
| # Coastal surge risk | |
| from spatial_queries import check_coastal | |
| is_coastal, coast_distance = check_coastal(lat, lon) | |
| # Guards against odd inputs | |
| if coast_distance is None or coast_distance < 0: | |
| coast_distance = 0.0 | |
| if elevation is None: | |
| raise ValueError("elevation is required") | |
| if elevation < 0: | |
| elevation = 0.0 | |
| if coast_distance < 5000: | |
| # Near coast β elevation governs risk | |
| if elevation < 2: | |
| coastal_risk = 0.99 | |
| elif elevation < 10: | |
| # Linear decline from 0.99 at 2 m | |
| coastal_risk = max(0.05, 0.99 + ((0.15 - 0.99) / 8.0) * (elevation - 2.0)) | |
| else: | |
| coastal_risk = 0.15 # Residual surge | |
| elif coast_distance < 20000: | |
| # Distance decay factor | |
| decay_factor = (coast_distance - 5000.0) / 15000.0 | |
| decay_factor = min(max(decay_factor, 0.0), 1.0) | |
| # Base residual | |
| distance_risk = 0.15 * (1.0 - decay_factor) | |
| # Elevation modifier | |
| elev_multiplier = 1.0 - (elevation / 10.0) | |
| elev_multiplier = min(max(elev_multiplier, 0.3), 1.0) | |
| coastal_risk = max(0.01, distance_risk * elev_multiplier) | |
| else: | |
| coastal_risk = 0.01 # Minimal residual background | |
| # Safety clamp | |
| coastal_risk = min(max(coastal_risk, 0.0), 1.0) | |
| # Pluvial risk β global-friendly (refined) | |
| tpi = terrain_metrics.get('tpi') or 0 | |
| slope = terrain_metrics.get('slope') or 0 | |
| elev = elevation | |
| # Clamp inputs | |
| tpi_clamped = max(min(tpi, 10), -10) | |
| slope_clamped = max(min(slope, 10), 0) | |
| # TPI factor: -10 (deep depression) | |
| # Mild convexity | |
| topo_linear = 1.0 - (tpi_clamped + 10) / 20.0 | |
| topo_factor = max(0.0, min(1.0, topo_linear**0.9)) | |
| # Nonlinear drop | |
| slope_fraction = 1.0 - (slope_clamped / 10.0) | |
| slope_factor = max(0.0, min(1.0, slope_fraction**1.2)) | |
| # Elevation decay: | |
| if elev <= 200: | |
| elevation_decay = 1.0 | |
| elif elev <= 1000: | |
| # linear to 0.1 across 800 m | |
| elevation_decay = 1.0 - ((elev - 200) / 800.0) * 0.9 | |
| else: | |
| elevation_decay = 0.1 | |
| # Combine (weights are tunable) | |
| pluvial_risk = (topo_factor * 0.6 + slope_factor * 0.4) * elevation_decay | |
| # Clamp final risk | |
| pluvial_risk = min(max(pluvial_risk, 0.0), 1.0) | |
| # Combined hazard with adaptive weights | |
| if elevation < 10: # Coastal zone | |
| weights = {'fluvial': 0.3, 'coastal': 0.5, 'pluvial': 0.2} | |
| elif elevation < 100: # Low inland | |
| weights = {'fluvial': 0.5, 'coastal': 0.1, 'pluvial': 0.4} | |
| else: # Elevated | |
| weights = {'fluvial': 0.6, 'coastal': 0.0, 'pluvial': 0.4} | |
| combined = (base_result['vulnerability_index'] * weights['fluvial'] + | |
| coastal_risk * weights['coastal'] + | |
| pluvial_risk * weights['pluvial']) | |
| # Identify dominant hazard | |
| hazards = { | |
| 'fluvial_riverine': base_result['vulnerability_index'], | |
| 'coastal_surge': coastal_risk, | |
| 'pluvial_drainage': pluvial_risk | |
| } | |
| dominant = max(hazards, key=hazards.get) | |
| return { | |
| **base_result, | |
| 'hazard_breakdown': { | |
| 'fluvial_riverine': round(base_result['vulnerability_index'], 3), | |
| 'coastal_surge': round(coastal_risk, 3), | |
| 'pluvial_drainage': round(pluvial_risk, 3), | |
| 'combined_index': round(combined, 3) | |
| }, | |
| 'dominant_hazard': dominant | |
| } | |