flood-vulnerability / vulnerability.py
adema5051's picture
Update vulnerability.py
34c2d84 verified
# vulnerability.py
import numpy as np
def normalize_component(value, max_value, inverse=False):
"""
Normalize to 0-1 range
"""
if value is None:
return 0.5
if inverse:
normalized = min(1.0, abs(value) / max_value)
else:
normalized = max(0.0, 1.0 - (abs(value) / max_value))
return normalized
def assess_flood_context(elevation, tpi, water_distance):
# Context 1: Coastal (<10m)
if elevation < 10:
if water_distance is not None and water_distance < 500:
return 'very_high', 1.0
elif water_distance is not None and water_distance < 2000:
return 'very_high' if tpi < -3 else 'very high', 1.0 if tpi < -3 else 0.98
elif water_distance is not None and water_distance < 5000:
return 'high' if tpi < -3 else 'moderate', 0.9 if tpi < -3 else 0.75
else:
return 'moderate', 0.7 if tpi < -5 else 0.6
# Context 2: High plateau (>600m)
elif elevation > 600:
if tpi < -15 and water_distance is not None and water_distance < 100:
return 'moderate', 0.65
elif tpi < -10:
return 'low', 0.55
else:
return 'low', 0.50
# Context 3: Mountain (300–600m)
elif elevation > 300:
if water_distance is not None and water_distance < 200 and tpi < -10:
return 'moderate', 0.75
elif water_distance is not None and water_distance < 500:
return 'low', 0.65
else:
return 'low', 0.55
# Context 4: River valley (100–300m)
elif 100 < elevation < 300:
if water_distance is not None and water_distance < 300 and tpi < -5:
return 'high', 1.0
elif water_distance is not None and water_distance < 500:
return 'moderate', 0.85
else:
return 'moderate', 0.7
# Context 5: Low inland (10–100m)
else:
if water_distance is None:
return 'moderate', 0.7
elif water_distance < 200:
if tpi < -8:
return 'very_high', 1.0
elif tpi < -5:
return 'high', 0.95
else:
return 'high', 0.85
elif water_distance < 500:
return 'high' if tpi < -5 else 'moderate', 0.85 if tpi < -5 else 0.75
elif water_distance < 1000:
return 'moderate', 0.70 if tpi < -5 else 0.65
else:
if tpi < -8:
return 'moderate', 0.65
elif tpi < -5:
return 'low', 0.60
else:
return 'low', 0.55
def calculate_vulnerability_index(lat, lon, height, basement, terrain_metrics, water_distance):
"""
Calculate flood vulnerability index with basement consideration
"""
elevation = terrain_metrics.get('elevation') or 0
tpi = terrain_metrics.get('tpi') or 0
slope = terrain_metrics.get('slope') or 0
# GET FLOOD CONTEXT
try:
context_risk_level, context_factor = assess_flood_context(elevation, tpi, water_distance)
except (TypeError, ValueError) as te:
print(f"Context failed for {lat},{lon}: {te} - default moderate")
context_risk_level, context_factor = 'moderate', 0.8
# Apply elevation penalty for high-altitude locations
if elevation > 500:
elevation_factor = max(0.3, 1.0 - (elevation - 500) / 1000)
else:
elevation_factor = 1.0
# Component 1: Proximity
if water_distance is None:
proximity_score = 0.5
elif water_distance < 100:
proximity_score = 1.0 * elevation_factor
elif water_distance < 500:
proximity_score = (0.9 - ((water_distance - 100) / 400) * 0.5) * elevation_factor
elif water_distance < 2000:
proximity_score = (0.4 - ((water_distance - 500) / 1500) * 0.3) * elevation_factor
elif water_distance < 5000:
proximity_score = max(0.0, 0.1 - ((water_distance - 2000) / 3000) * 0.1) * elevation_factor
else:
proximity_score = 0.001
# Component 2: TPI (Topographic Position Index)
if tpi is not None:
if tpi < -5:
tpi_score = min(1.0, 0.7 + abs(tpi + 5) / 30)
elif tpi > 5:
tpi_score = max(0.0, 0.3 - (tpi - 5) / 50)
else:
tpi_score = 0.5 - (tpi / 20)
else:
tpi_score = 0.5
tpi_score = max(0.0, min(1.0, tpi_score))
if elevation > 500:
tpi_score = tpi_score * elevation_factor
# Component 3: Slope
if slope < 0.5:
slope_score = 0.9
elif slope < 2:
slope_score = 0.8 - ((slope - 0.5) / 1.5) * 0.3
elif slope < 6:
slope_score = 0.5 - ((slope - 2) / 4) * 0.3
else:
slope_score = max(0.05, 0.2 - (slope - 6) / 20)
# Component 4: Building protection factor
net_protection = height + abs(basement)
# Height protection calculation (without basement penalty)
if net_protection <= 0:
height_score = 0.9
elif net_protection < 3:
height_score = 0.8 - (net_protection / 3) * 0.3
elif net_protection < 8:
height_score = 0.5 - ((net_protection - 3) / 5) * 0.3
else:
height_score = max(0.1, 0.2 - ((net_protection - 8) / 15) * 0.15)
height_score = max(0.0, min(1.0, height_score))
# Increase weight for building characteristics when basement present
if basement < 0:
weights = {
'proximity': 0.25,
'tpi': 0.30,
'slope': 0.15,
'height': 0.30
}
else:
weights = {
'proximity': 0.30,
'tpi': 0.35,
'slope': 0.20,
'height': 0.15
}
# Base vulnerability
base_vulnerability = (
weights['proximity'] * proximity_score +
weights['tpi'] * tpi_score +
weights['slope'] * slope_score +
weights['height'] * height_score
)
# Basement as multiplier
if basement < 0:
basement_multiplier = 1.0 + (abs(basement) * 0.15)
base_vulnerability = min(1.0, base_vulnerability * basement_multiplier)
# Apply context adjustment
vulnerability_index = base_vulnerability * context_factor
# Risk level based on final vulnerability_index with threshold mapping
if vulnerability_index >= 0.80:
final_risk = 'very_high'
elif vulnerability_index >= 0.65:
final_risk = 'high'
elif vulnerability_index >= 0.40:
final_risk = 'moderate'
elif vulnerability_index >= 0.20:
final_risk = 'low'
else:
final_risk = 'very_low'
# Keep context-based label if more severe
risk_levels_order = ['very_low', 'low', 'moderate', 'high', 'very_high']
context_severity = risk_levels_order.index(context_risk_level) if context_risk_level in risk_levels_order else 2
final_severity = risk_levels_order.index(final_risk)
risk_level = risk_levels_order[max(context_severity, final_severity)]
# Track component scores for SHAP
components = {
'proximity_score': proximity_score,
'tpi_score': tpi_score,
'slope_score': slope_score,
'height_score': height_score,
'elevation': elevation
}
# Calculate uncertainty
uncertainty_analysis = calculate_uncertainty(
terrain_metrics,
water_distance,
context_factor,
lat,
lon
)
# Calculate confidence interval
confidence_interval = calculate_confidence_interval(
vulnerability_index,
uncertainty_analysis['uncertainty']
)
return {
'vulnerability_index': round(vulnerability_index, 3),
'confidence_interval': confidence_interval,
'risk_level': risk_level,
'distance_to_water_m': round(water_distance, 1) if water_distance else None,
'elevation_m': elevation,
'relative_elevation_m': round(tpi, 2) if tpi is not None else None,
'slope_degrees': round(slope, 2) if slope is not None else None,
'uncertainty_analysis': uncertainty_analysis,
'components': components
}
def calculate_uncertainty(terrain_metrics, water_distance, context_factor, lat, lon):
"""
Physically-based uncertainty quantification - FIXED scaling
"""
uncertainties = {}
# 1. ELEVATION UNCERTAINTY
elevation = terrain_metrics.get('elevation')
slope = terrain_metrics.get('slope') or 0
if elevation is None:
uncertainties['elevation'] = 0.15
else:
# Base DEM error in meters
if abs(lat) < 60:
base_error_m = 2.5
else:
base_error_m = 4.0
# Slope increases error
if slope > 15:
slope_multiplier = 1 + (slope - 15) / 30
base_error_m *= slope_multiplier
# Convert to normalized uncertainty
if elevation < 10:
uncertainties['elevation'] = 0.08 # coastal - elevation matters a lot
elif elevation < 100:
uncertainties['elevation'] = 0.06 # low inland
else:
uncertainties['elevation'] = 0.03 # elevated - less critical
# 2. TPI UNCERTAINTY
tpi = terrain_metrics.get('tpi')
if tpi is None:
uncertainties['tpi'] = 0.12
else:
# TPI uncertainty affects the depression detection
if abs(tpi) < 2:
uncertainties['tpi'] = 0.10 # near-flat, hard to classify
elif abs(tpi) < 5:
uncertainties['tpi'] = 0.06
else:
uncertainties['tpi'] = 0.04 # clear depression/ridge
# 3. SLOPE UNCERTAINTY
if slope is None:
uncertainties['slope'] = 0.10
else:
if slope < 2:
uncertainties['slope'] = 0.08 # very flat = uncertain
elif slope < 10:
uncertainties['slope'] = 0.04
else:
uncertainties['slope'] = 0.03 # steep = clear signal
# 4. WATER DISTANCE UNCERTAINTY
if water_distance is None:
uncertainties['water_proximity'] = 0.20
elif water_distance < 50:
uncertainties['water_proximity'] = 0.03
elif water_distance < 500:
uncertainties['water_proximity'] = 0.06
elif water_distance < 2000:
uncertainties['water_proximity'] = 0.10
else:
uncertainties['water_proximity'] = 0.15
# 5. CONTEXT UNCERTAINTY
if context_factor < 0.7:
uncertainties['context'] = 0.04
elif context_factor > 0.95:
uncertainties['context'] = 0.06
else:
uncertainties['context'] = 0.03
# 6. MODEL STRUCTURAL UNCERTAINTY
uncertainties['model'] = 0.08
# Weight by component importance in vulnerability calculation
weights = {
'elevation': 0.20,
'tpi': 0.30,
'slope': 0.15,
'water_proximity': 0.25,
'context': 0.05,
'model': 0.05
}
# Weighted root-sum-of-squares
weighted_variance = sum(weights[k] * (v ** 2) for k, v in uncertainties.items())
total_uncertainty = np.sqrt(weighted_variance)
# Additional damping factor
total_uncertainty *= 0.7 # empirical adjustment
confidence = max(0.0, min(1.0, 1.0 - total_uncertainty))
# Get dominant error sources
sorted_uncertainties = sorted(uncertainties.items(), key=lambda x: x[1], reverse=True)
dominant_sources = sorted_uncertainties[:3]
return {
'confidence': round(confidence, 3),
'uncertainty': round(total_uncertainty, 3),
'components': {k: round(v, 3) for k, v in uncertainties.items()},
'interpretation': interpret_confidence(confidence),
'data_quality_flags': get_quality_flags(terrain_metrics, water_distance),
'dominant_error_sources': dominant_sources
}
def get_quality_flags(terrain_metrics, water_distance):
"""
Identify specific data quality issues
"""
flags = []
if terrain_metrics.get('elevation') is None:
flags.append('missing_elevation')
if terrain_metrics.get('tpi') is None:
flags.append('missing_tpi')
if terrain_metrics.get('slope') is None:
flags.append('missing_slope')
if water_distance is None:
flags.append('water_distance_unknown')
elif water_distance > 5000:
flags.append('far_from_water_search_limited')
elevation = terrain_metrics.get('elevation') or 0
slope = terrain_metrics.get('slope') or 0
if slope > 20:
flags.append('steep_terrain_dem_error_high')
if elevation < 1 and water_distance is not None and water_distance < 100:
flags.append('coastal_surge_risk_not_modeled')
return flags
def interpret_confidence(confidence):
"""
Realistic confidence interpretation
"""
if confidence >= 0.85:
return "High confidence - complete terrain data with low uncertainty"
elif confidence >= 0.75:
return "Good confidence - reliable data sources available"
elif confidence >= 0.65:
return "Moderate confidence - some data limitations present"
elif confidence >= 0.50:
return "Fair confidence - significant data gaps or measurement uncertainty"
else:
return "Low confidence - substantial missing data, use with caution"
def calculate_confidence_interval(vulnerability_index, uncertainty):
"""
Calculate 95% confidence interval with proper bounds
"""
margin = 1.96 * uncertainty
# Clip to valid 0-1 range
lower = max(0.0, vulnerability_index - margin)
upper = min(1.0, vulnerability_index + margin)
return {
'point_estimate': round(vulnerability_index, 3),
'lower_bound_95': round(lower, 3),
'upper_bound_95': round(upper, 3),
'margin_of_error': round(margin, 3)
}
def calculate_multi_hazard_vulnerability(lat, lon, height, basement, terrain_metrics, water_distance):
"""
Multi-hazard assessment
"""
# Base assessment
base_result = calculate_vulnerability_index(
lat, lon, height, basement, terrain_metrics, water_distance
)
elevation = terrain_metrics.get('elevation') or 0
# Coastal surge risk
from spatial_queries import check_coastal
is_coastal, coast_distance = check_coastal(lat, lon)
# Guards against odd inputs
if coast_distance is None or coast_distance < 0:
coast_distance = 0.0
if elevation is None:
raise ValueError("elevation is required")
if elevation < 0:
elevation = 0.0
if coast_distance < 5000:
# Near coast β€” elevation governs risk
if elevation < 2:
coastal_risk = 0.99
elif elevation < 10:
# Linear decline from 0.99 at 2 m
coastal_risk = max(0.05, 0.99 + ((0.15 - 0.99) / 8.0) * (elevation - 2.0))
else:
coastal_risk = 0.15 # Residual surge
elif coast_distance < 20000:
# Distance decay factor
decay_factor = (coast_distance - 5000.0) / 15000.0
decay_factor = min(max(decay_factor, 0.0), 1.0)
# Base residual
distance_risk = 0.15 * (1.0 - decay_factor)
# Elevation modifier
elev_multiplier = 1.0 - (elevation / 10.0)
elev_multiplier = min(max(elev_multiplier, 0.3), 1.0)
coastal_risk = max(0.01, distance_risk * elev_multiplier)
else:
coastal_risk = 0.01 # Minimal residual background
# Safety clamp
coastal_risk = min(max(coastal_risk, 0.0), 1.0)
# Pluvial risk – global-friendly (refined)
tpi = terrain_metrics.get('tpi') or 0
slope = terrain_metrics.get('slope') or 0
elev = elevation
# Clamp inputs
tpi_clamped = max(min(tpi, 10), -10)
slope_clamped = max(min(slope, 10), 0)
# TPI factor: -10 (deep depression)
# Mild convexity
topo_linear = 1.0 - (tpi_clamped + 10) / 20.0
topo_factor = max(0.0, min(1.0, topo_linear**0.9))
# Nonlinear drop
slope_fraction = 1.0 - (slope_clamped / 10.0)
slope_factor = max(0.0, min(1.0, slope_fraction**1.2))
# Elevation decay:
if elev <= 200:
elevation_decay = 1.0
elif elev <= 1000:
# linear to 0.1 across 800 m
elevation_decay = 1.0 - ((elev - 200) / 800.0) * 0.9
else:
elevation_decay = 0.1
# Combine (weights are tunable)
pluvial_risk = (topo_factor * 0.6 + slope_factor * 0.4) * elevation_decay
# Clamp final risk
pluvial_risk = min(max(pluvial_risk, 0.0), 1.0)
# Combined hazard with adaptive weights
if elevation < 10: # Coastal zone
weights = {'fluvial': 0.3, 'coastal': 0.5, 'pluvial': 0.2}
elif elevation < 100: # Low inland
weights = {'fluvial': 0.5, 'coastal': 0.1, 'pluvial': 0.4}
else: # Elevated
weights = {'fluvial': 0.6, 'coastal': 0.0, 'pluvial': 0.4}
combined = (base_result['vulnerability_index'] * weights['fluvial'] +
coastal_risk * weights['coastal'] +
pluvial_risk * weights['pluvial'])
# Identify dominant hazard
hazards = {
'fluvial_riverine': base_result['vulnerability_index'],
'coastal_surge': coastal_risk,
'pluvial_drainage': pluvial_risk
}
dominant = max(hazards, key=hazards.get)
return {
**base_result,
'hazard_breakdown': {
'fluvial_riverine': round(base_result['vulnerability_index'], 3),
'coastal_surge': round(coastal_risk, 3),
'pluvial_drainage': round(pluvial_risk, 3),
'combined_index': round(combined, 3)
},
'dominant_hazard': dominant
}