biomass-prediction-app / feature_engineering.py
pokkiri's picture
Update feature_engineering.py
a49479d verified
"""
Feature engineering module for biomass prediction.
This module extracts the 99 features needed by the StableResNet model.
Author: najahpokkiri
Date: 2025-05-19
"""
import numpy as np
import logging
from datetime import datetime
# Configure logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Try to import optional dependencies but don't fail if not available
try:
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
logger.warning("scikit-learn not available. PCA features will be approximated.")
try:
from skimage.filters import sobel
from skimage.feature import local_binary_pattern, graycomatrix, graycoprops
SKIMAGE_AVAILABLE = True
except ImportError:
SKIMAGE_AVAILABLE = False
logger.warning("scikit-image not available. Texture features will be approximated.")
def safe_divide(a, b, fill_value=0.0):
"""Safe division that handles zeros in the denominator"""
a = np.asarray(a, dtype=np.float32)
b = np.asarray(b, dtype=np.float32)
# Handle NaN/Inf in inputs
a = np.nan_to_num(a, nan=0.0, posinf=0.0, neginf=0.0)
b = np.nan_to_num(b, nan=1e-10, posinf=1e10, neginf=-1e10)
mask = np.abs(b) < 1e-10
result = np.full_like(a, fill_value, dtype=np.float32)
if np.any(~mask):
result[~mask] = a[~mask] / b[~mask]
return np.nan_to_num(result, nan=fill_value, posinf=fill_value, neginf=fill_value)
def calculate_spectral_indices(satellite_data):
"""Calculate spectral indices from satellite bands"""
indices = {}
n_bands = satellite_data.shape[0]
# Enhanced band mapping with error checking
def safe_get_band(idx):
return satellite_data[idx] if idx < n_bands else None
# Sentinel-2 bands (assuming standard band order)
# B2(blue), B3(green), B4(red), B8(nir), B11(swir1), B12(swir2)
try:
blue = safe_get_band(1) # Adjust indices based on your data
green = safe_get_band(2)
red = safe_get_band(3)
nir = safe_get_band(7)
swir1 = safe_get_band(9)
swir2 = safe_get_band(10)
if all(b is not None for b in [red, nir]):
# NDVI (Normalized Difference Vegetation Index)
indices['NDVI'] = safe_divide(nir - red, nir + red)
if blue is not None and green is not None:
# EVI (Enhanced Vegetation Index)
indices['EVI'] = 2.5 * safe_divide(nir - red, nir + 6*red - 7.5*blue + 1)
# SAVI (Soil Adjusted Vegetation Index)
indices['SAVI'] = 1.5 * safe_divide(nir - red, nir + red + 0.5)
# MSAVI2 (Modified Soil Adjusted Vegetation Index)
indices['MSAVI2'] = 0.5 * (2 * nir + 1 - np.sqrt((2 * nir + 1)**2 - 8 * (nir - red)))
# NDWI (Normalized Difference Water Index)
indices['NDWI'] = safe_divide(green - nir, green + nir)
if swir1 is not None and nir is not None:
# NDMI (Normalized Difference Moisture Index)
indices['NDMI'] = safe_divide(nir - swir1, nir + swir1)
if swir2 is not None and nir is not None:
# NBR (Normalized Burn Ratio)
indices['NBR'] = safe_divide(nir - swir2, nir + swir2)
except Exception as e:
logger.warning(f"Error calculating spectral indices: {e}")
# Clean up None values and NaNs
indices = {k: np.nan_to_num(v, nan=0.0) for k, v in indices.items() if v is not None}
# Ensure we have all required indices by providing defaults
required_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR']
for idx in required_indices:
if idx not in indices:
if satellite_data.shape[1] > 0 and satellite_data.shape[2] > 0:
indices[idx] = np.zeros((satellite_data.shape[1], satellite_data.shape[2]), dtype=np.float32)
return indices
def extract_texture_features(satellite_data):
"""Extract texture features from satellite data"""
texture_features = {}
height, width = satellite_data.shape[1], satellite_data.shape[2]
# If scikit-image is not available, return placeholders
if not SKIMAGE_AVAILABLE:
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7',
'GLCM_homogeneity_B7', 'GLCM_energy_B7']
for name in texture_names:
texture_features[name] = np.zeros((height, width), dtype=np.float32)
return texture_features
try:
# Use NIR band (band 7) for texture features
b7_idx = min(7, satellite_data.shape[0] - 1)
band = satellite_data[b7_idx].copy()
band = np.nan_to_num(band, nan=0.0)
# 1. Sobel filter for edge detection
sobel_filtered = sobel(band)
texture_features['Sobel_B7'] = sobel_filtered
# 2. Local Binary Pattern
# Normalize band to 0-255 range for LBP
band_norm = band.copy()
if np.any(~np.isnan(band)):
band_min, band_max = np.nanpercentile(band, [1, 99])
if band_max > band_min:
band_norm = np.clip((band - band_min) / (band_max - band_min + 1e-8) * 255, 0, 255).astype(np.uint8)
else:
band_norm = np.zeros_like(band, dtype=np.uint8)
# Calculate LBP
lbp = local_binary_pattern(band_norm, 8, 1, method='uniform')
texture_features['LBP_B7'] = lbp
# 3. GLCM properties
# Create sample patch for GLCM calculation
sample_size = min(128, height, width)
center_y, center_x = height // 2, width // 2
offset = sample_size // 2
y_start = max(0, center_y - offset)
y_end = min(height, center_y + offset)
x_start = max(0, center_x - offset)
x_end = min(width, center_x + offset)
patch = band_norm[y_start:y_end, x_start:x_end]
# Calculate GLCM properties if patch is valid
if patch.size > 0:
glcm = graycomatrix(patch, [1], [0], levels=256, symmetric=True, normed=True)
for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']:
try:
value = float(graycoprops(glcm, prop)[0, 0])
texture_features[f'GLCM_{prop}_B7'] = np.full((height, width), value)
except:
texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32)
else:
# Create placeholder GLCM features if patch is invalid
for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']:
texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32)
except Exception as e:
logger.error(f"Error in texture feature extraction: {e}")
# Provide placeholder features in case of error
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7',
'GLCM_homogeneity_B7', 'GLCM_energy_B7']
for name in texture_names:
texture_features[name] = np.zeros((height, width), dtype=np.float32)
return texture_features
def calculate_spatial_features(satellite_data, indices):
"""Calculate spatial context features like gradients"""
spatial_features = {}
height, width = satellite_data.shape[1], satellite_data.shape[2]
# 1. Gradient of Band 7 (NIR)
b7_idx = min(7, satellite_data.shape[0] - 1)
band = satellite_data[b7_idx].copy()
band = np.nan_to_num(band, nan=0.0)
try:
# Calculate the gradient magnitude
grad_y, grad_x = np.gradient(band)
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
spatial_features['Gradient_B7'] = grad_magnitude
except Exception as e:
logger.warning(f"Error calculating band gradient: {e}")
spatial_features['Gradient_B7'] = np.zeros((height, width), dtype=np.float32)
# 2. NDVI gradient
try:
ndvi = indices.get('NDVI', np.zeros((height, width), dtype=np.float32))
ndvi = np.nan_to_num(ndvi, nan=0.0)
# Calculate the gradient magnitude for NDVI
grad_y, grad_x = np.gradient(ndvi)
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2)
spatial_features['NDVI_gradient'] = grad_magnitude
except Exception as e:
logger.warning(f"Error calculating NDVI gradient: {e}")
spatial_features['NDVI_gradient'] = np.zeros((height, width), dtype=np.float32)
return spatial_features
def calculate_pca_features(satellite_data, n_components=25):
"""Calculate PCA features from satellite bands"""
pca_features = {}
height, width = satellite_data.shape[1], satellite_data.shape[2]
n_bands = satellite_data.shape[0]
# If scikit-learn is not available, return placeholders
if not SKLEARN_AVAILABLE:
for i in range(1, n_components + 1):
# Create some basic derived features as placeholders
if i <= n_bands:
# Use band values directly for first components
pca_features[f'PCA_{i:02d}'] = satellite_data[i-1]
else:
# Create synthetic features for remaining components
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32)
return pca_features
try:
# Reshape for PCA (pixels x bands)
bands_reshaped = satellite_data.reshape(n_bands, -1).T
# Handle NaN values
valid_mask = ~np.any(np.isnan(bands_reshaped), axis=1)
bands_clean = bands_reshaped[valid_mask]
if len(bands_clean) == 0:
logger.warning("No valid data for PCA calculation")
# Create placeholder PCA features
for i in range(1, n_components + 1):
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32)
return pca_features
# Standardize valid data
scaler = StandardScaler()
bands_scaled = scaler.fit_transform(bands_clean)
# Calculate PCA
pca = PCA(n_components=min(n_components, bands_scaled.shape[1], bands_scaled.shape[0]))
pca_result = pca.fit_transform(bands_scaled)
# Extend to full 25 components if needed
actual_components = pca_result.shape[1]
if actual_components < n_components:
logger.warning(f"Only {actual_components} PCA components calculated, padding to {n_components}")
padding = np.zeros((pca_result.shape[0], n_components - actual_components))
pca_result = np.hstack([pca_result, padding])
# Map back to original pixels
pca_all = np.zeros((bands_reshaped.shape[0], n_components))
pca_all[valid_mask] = pca_result
# Reshape to spatial dimensions
pca_spatial = pca_all.reshape(height, width, n_components)
# Store each component with the correct naming
for i in range(1, n_components + 1):
pca_features[f'PCA_{i:02d}'] = pca_spatial[:, :, i-1]
# Log PCA explained variance
if hasattr(pca, 'explained_variance_ratio_'):
logger.info(f"PCA explained variance: {pca.explained_variance_ratio_.sum():.3f}")
except Exception as e:
logger.error(f"Error calculating PCA features: {e}")
# Create placeholder PCA features
for i in range(1, n_components + 1):
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32)
return pca_features
def extract_all_features(satellite_data):
"""
Extract exactly 99 features needed by the model:
- 59 original bands
- 7 spectral indices
- 6 texture features
- 2 spatial features
- 25 PCA components
Parameters:
satellite_data (ndarray): Array of shape (bands, height, width)
Returns:
features_array (ndarray): Array of shape (valid_pixels, 99)
valid_mask (ndarray): Boolean mask of valid pixels
feature_names (list): List of 99 feature names
"""
start_time = datetime.now()
logger.info("Extracting features for biomass prediction...")
height, width = satellite_data.shape[1], satellite_data.shape[2]
# Create valid pixel mask (no NaN or Inf values)
valid_mask = np.all(np.isfinite(satellite_data), axis=0)
valid_y, valid_x = np.where(valid_mask)
n_valid = len(valid_y)
logger.info(f"Found {n_valid} valid pixels out of {height*width}")
# Generate all feature categories
logger.info("Calculating spectral indices...")
indices = calculate_spectral_indices(satellite_data)
logger.info("Extracting texture features...")
texture_features = extract_texture_features(satellite_data)
logger.info("Calculating spatial features...")
spatial_features = calculate_spatial_features(satellite_data, indices)
logger.info("Computing PCA components...")
pca_features = calculate_pca_features(satellite_data)
# Define the ordered list of feature names
feature_names = []
# 1. Add original band names (Band_01 through Band_59)
for i in range(1, 60):
feature_names.append(f'Band_{i:02d}')
# 2. Add spectral indices
spectral_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR']
feature_names.extend(spectral_indices)
# 3. Add texture features
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7',
'GLCM_homogeneity_B7', 'GLCM_energy_B7']
feature_names.extend(texture_names)
# 4. Add spatial features
spatial_names = ['Gradient_B7', 'NDVI_gradient']
feature_names.extend(spatial_names)
# 5. Add PCA components
for i in range(1, 26):
feature_names.append(f'PCA_{i:02d}')
# Create feature dictionary with all features
all_features = {}
# 1. Original bands
for i in range(min(satellite_data.shape[0], 59)):
all_features[f'Band_{i+1:02d}'] = satellite_data[i]
# Pad with zeros if we have fewer than 59 bands
for i in range(satellite_data.shape[0], 59):
all_features[f'Band_{i+1:02d}'] = np.zeros((height, width), dtype=np.float32)
# 2. Add other feature categories
all_features.update(indices)
all_features.update(texture_features)
all_features.update(spatial_features)
all_features.update(pca_features)
# Verify we have exactly 99 features
assert len(feature_names) == 99, f"Expected 99 features, but got {len(feature_names)}"
# Extract feature values for valid pixels
feature_matrix = np.zeros((n_valid, len(feature_names)), dtype=np.float32)
for i, name in enumerate(feature_names):
if name in all_features:
feature_data = all_features[name]
if feature_data.ndim == 2:
feature_values = feature_data[valid_y, valid_x]
else:
feature_values = np.full(n_valid, feature_data)
feature_matrix[:, i] = np.nan_to_num(feature_values, nan=0.0)
else:
logger.warning(f"Feature '{name}' not found, using zeros")
feature_matrix[:, i] = 0.0
end_time = datetime.now()
processing_time = (end_time - start_time).total_seconds()
logger.info(f"Successfully extracted {len(feature_names)} features for {n_valid} pixels in {processing_time:.2f} seconds")
return feature_matrix, valid_mask, feature_names
# Simple test function
def test_feature_extraction():
"""Test the feature extraction pipeline with sample data"""
try:
# Create sample data (5 bands, 100x100 pixels)
satellite_data = np.random.random((5, 100, 100)).astype(np.float32)
# Extract features
feature_matrix, valid_mask, feature_names = extract_all_features(satellite_data)
# Print summary
print(f"Sample data shape: {satellite_data.shape}")
print(f"Feature matrix shape: {feature_matrix.shape}")
print(f"Number of feature names: {len(feature_names)}")
print(f"Valid pixels: {np.sum(valid_mask)}")
return True
except Exception as e:
print(f"Feature extraction test failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
# Run a simple test if this script is executed directly
test_feature_extraction()