Spaces:
Running
Running
""" | |
Feature engineering module for biomass prediction. | |
This module extracts the 99 features needed by the StableResNet model. | |
Author: najahpokkiri | |
Date: 2025-05-19 | |
""" | |
import numpy as np | |
import logging | |
from datetime import datetime | |
# Configure logger | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Try to import optional dependencies but don't fail if not available | |
try: | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.decomposition import PCA | |
SKLEARN_AVAILABLE = True | |
except ImportError: | |
SKLEARN_AVAILABLE = False | |
logger.warning("scikit-learn not available. PCA features will be approximated.") | |
try: | |
from skimage.filters import sobel | |
from skimage.feature import local_binary_pattern, graycomatrix, graycoprops | |
SKIMAGE_AVAILABLE = True | |
except ImportError: | |
SKIMAGE_AVAILABLE = False | |
logger.warning("scikit-image not available. Texture features will be approximated.") | |
def safe_divide(a, b, fill_value=0.0): | |
"""Safe division that handles zeros in the denominator""" | |
a = np.asarray(a, dtype=np.float32) | |
b = np.asarray(b, dtype=np.float32) | |
# Handle NaN/Inf in inputs | |
a = np.nan_to_num(a, nan=0.0, posinf=0.0, neginf=0.0) | |
b = np.nan_to_num(b, nan=1e-10, posinf=1e10, neginf=-1e10) | |
mask = np.abs(b) < 1e-10 | |
result = np.full_like(a, fill_value, dtype=np.float32) | |
if np.any(~mask): | |
result[~mask] = a[~mask] / b[~mask] | |
return np.nan_to_num(result, nan=fill_value, posinf=fill_value, neginf=fill_value) | |
def calculate_spectral_indices(satellite_data): | |
"""Calculate spectral indices from satellite bands""" | |
indices = {} | |
n_bands = satellite_data.shape[0] | |
# Enhanced band mapping with error checking | |
def safe_get_band(idx): | |
return satellite_data[idx] if idx < n_bands else None | |
# Sentinel-2 bands (assuming standard band order) | |
# B2(blue), B3(green), B4(red), B8(nir), B11(swir1), B12(swir2) | |
try: | |
blue = safe_get_band(1) # Adjust indices based on your data | |
green = safe_get_band(2) | |
red = safe_get_band(3) | |
nir = safe_get_band(7) | |
swir1 = safe_get_band(9) | |
swir2 = safe_get_band(10) | |
if all(b is not None for b in [red, nir]): | |
# NDVI (Normalized Difference Vegetation Index) | |
indices['NDVI'] = safe_divide(nir - red, nir + red) | |
if blue is not None and green is not None: | |
# EVI (Enhanced Vegetation Index) | |
indices['EVI'] = 2.5 * safe_divide(nir - red, nir + 6*red - 7.5*blue + 1) | |
# SAVI (Soil Adjusted Vegetation Index) | |
indices['SAVI'] = 1.5 * safe_divide(nir - red, nir + red + 0.5) | |
# MSAVI2 (Modified Soil Adjusted Vegetation Index) | |
indices['MSAVI2'] = 0.5 * (2 * nir + 1 - np.sqrt((2 * nir + 1)**2 - 8 * (nir - red))) | |
# NDWI (Normalized Difference Water Index) | |
indices['NDWI'] = safe_divide(green - nir, green + nir) | |
if swir1 is not None and nir is not None: | |
# NDMI (Normalized Difference Moisture Index) | |
indices['NDMI'] = safe_divide(nir - swir1, nir + swir1) | |
if swir2 is not None and nir is not None: | |
# NBR (Normalized Burn Ratio) | |
indices['NBR'] = safe_divide(nir - swir2, nir + swir2) | |
except Exception as e: | |
logger.warning(f"Error calculating spectral indices: {e}") | |
# Clean up None values and NaNs | |
indices = {k: np.nan_to_num(v, nan=0.0) for k, v in indices.items() if v is not None} | |
# Ensure we have all required indices by providing defaults | |
required_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] | |
for idx in required_indices: | |
if idx not in indices: | |
if satellite_data.shape[1] > 0 and satellite_data.shape[2] > 0: | |
indices[idx] = np.zeros((satellite_data.shape[1], satellite_data.shape[2]), dtype=np.float32) | |
return indices | |
def extract_texture_features(satellite_data): | |
"""Extract texture features from satellite data""" | |
texture_features = {} | |
height, width = satellite_data.shape[1], satellite_data.shape[2] | |
# If scikit-image is not available, return placeholders | |
if not SKIMAGE_AVAILABLE: | |
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
for name in texture_names: | |
texture_features[name] = np.zeros((height, width), dtype=np.float32) | |
return texture_features | |
try: | |
# Use NIR band (band 7) for texture features | |
b7_idx = min(7, satellite_data.shape[0] - 1) | |
band = satellite_data[b7_idx].copy() | |
band = np.nan_to_num(band, nan=0.0) | |
# 1. Sobel filter for edge detection | |
sobel_filtered = sobel(band) | |
texture_features['Sobel_B7'] = sobel_filtered | |
# 2. Local Binary Pattern | |
# Normalize band to 0-255 range for LBP | |
band_norm = band.copy() | |
if np.any(~np.isnan(band)): | |
band_min, band_max = np.nanpercentile(band, [1, 99]) | |
if band_max > band_min: | |
band_norm = np.clip((band - band_min) / (band_max - band_min + 1e-8) * 255, 0, 255).astype(np.uint8) | |
else: | |
band_norm = np.zeros_like(band, dtype=np.uint8) | |
# Calculate LBP | |
lbp = local_binary_pattern(band_norm, 8, 1, method='uniform') | |
texture_features['LBP_B7'] = lbp | |
# 3. GLCM properties | |
# Create sample patch for GLCM calculation | |
sample_size = min(128, height, width) | |
center_y, center_x = height // 2, width // 2 | |
offset = sample_size // 2 | |
y_start = max(0, center_y - offset) | |
y_end = min(height, center_y + offset) | |
x_start = max(0, center_x - offset) | |
x_end = min(width, center_x + offset) | |
patch = band_norm[y_start:y_end, x_start:x_end] | |
# Calculate GLCM properties if patch is valid | |
if patch.size > 0: | |
glcm = graycomatrix(patch, [1], [0], levels=256, symmetric=True, normed=True) | |
for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: | |
try: | |
value = float(graycoprops(glcm, prop)[0, 0]) | |
texture_features[f'GLCM_{prop}_B7'] = np.full((height, width), value) | |
except: | |
texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) | |
else: | |
# Create placeholder GLCM features if patch is invalid | |
for prop in ['contrast', 'dissimilarity', 'homogeneity', 'energy']: | |
texture_features[f'GLCM_{prop}_B7'] = np.zeros((height, width), dtype=np.float32) | |
except Exception as e: | |
logger.error(f"Error in texture feature extraction: {e}") | |
# Provide placeholder features in case of error | |
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
for name in texture_names: | |
texture_features[name] = np.zeros((height, width), dtype=np.float32) | |
return texture_features | |
def calculate_spatial_features(satellite_data, indices): | |
"""Calculate spatial context features like gradients""" | |
spatial_features = {} | |
height, width = satellite_data.shape[1], satellite_data.shape[2] | |
# 1. Gradient of Band 7 (NIR) | |
b7_idx = min(7, satellite_data.shape[0] - 1) | |
band = satellite_data[b7_idx].copy() | |
band = np.nan_to_num(band, nan=0.0) | |
try: | |
# Calculate the gradient magnitude | |
grad_y, grad_x = np.gradient(band) | |
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) | |
spatial_features['Gradient_B7'] = grad_magnitude | |
except Exception as e: | |
logger.warning(f"Error calculating band gradient: {e}") | |
spatial_features['Gradient_B7'] = np.zeros((height, width), dtype=np.float32) | |
# 2. NDVI gradient | |
try: | |
ndvi = indices.get('NDVI', np.zeros((height, width), dtype=np.float32)) | |
ndvi = np.nan_to_num(ndvi, nan=0.0) | |
# Calculate the gradient magnitude for NDVI | |
grad_y, grad_x = np.gradient(ndvi) | |
grad_magnitude = np.sqrt(grad_x**2 + grad_y**2) | |
spatial_features['NDVI_gradient'] = grad_magnitude | |
except Exception as e: | |
logger.warning(f"Error calculating NDVI gradient: {e}") | |
spatial_features['NDVI_gradient'] = np.zeros((height, width), dtype=np.float32) | |
return spatial_features | |
def calculate_pca_features(satellite_data, n_components=25): | |
"""Calculate PCA features from satellite bands""" | |
pca_features = {} | |
height, width = satellite_data.shape[1], satellite_data.shape[2] | |
n_bands = satellite_data.shape[0] | |
# If scikit-learn is not available, return placeholders | |
if not SKLEARN_AVAILABLE: | |
for i in range(1, n_components + 1): | |
# Create some basic derived features as placeholders | |
if i <= n_bands: | |
# Use band values directly for first components | |
pca_features[f'PCA_{i:02d}'] = satellite_data[i-1] | |
else: | |
# Create synthetic features for remaining components | |
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
return pca_features | |
try: | |
# Reshape for PCA (pixels x bands) | |
bands_reshaped = satellite_data.reshape(n_bands, -1).T | |
# Handle NaN values | |
valid_mask = ~np.any(np.isnan(bands_reshaped), axis=1) | |
bands_clean = bands_reshaped[valid_mask] | |
if len(bands_clean) == 0: | |
logger.warning("No valid data for PCA calculation") | |
# Create placeholder PCA features | |
for i in range(1, n_components + 1): | |
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
return pca_features | |
# Standardize valid data | |
scaler = StandardScaler() | |
bands_scaled = scaler.fit_transform(bands_clean) | |
# Calculate PCA | |
pca = PCA(n_components=min(n_components, bands_scaled.shape[1], bands_scaled.shape[0])) | |
pca_result = pca.fit_transform(bands_scaled) | |
# Extend to full 25 components if needed | |
actual_components = pca_result.shape[1] | |
if actual_components < n_components: | |
logger.warning(f"Only {actual_components} PCA components calculated, padding to {n_components}") | |
padding = np.zeros((pca_result.shape[0], n_components - actual_components)) | |
pca_result = np.hstack([pca_result, padding]) | |
# Map back to original pixels | |
pca_all = np.zeros((bands_reshaped.shape[0], n_components)) | |
pca_all[valid_mask] = pca_result | |
# Reshape to spatial dimensions | |
pca_spatial = pca_all.reshape(height, width, n_components) | |
# Store each component with the correct naming | |
for i in range(1, n_components + 1): | |
pca_features[f'PCA_{i:02d}'] = pca_spatial[:, :, i-1] | |
# Log PCA explained variance | |
if hasattr(pca, 'explained_variance_ratio_'): | |
logger.info(f"PCA explained variance: {pca.explained_variance_ratio_.sum():.3f}") | |
except Exception as e: | |
logger.error(f"Error calculating PCA features: {e}") | |
# Create placeholder PCA features | |
for i in range(1, n_components + 1): | |
pca_features[f'PCA_{i:02d}'] = np.zeros((height, width), dtype=np.float32) | |
return pca_features | |
def extract_all_features(satellite_data): | |
""" | |
Extract exactly 99 features needed by the model: | |
- 59 original bands | |
- 7 spectral indices | |
- 6 texture features | |
- 2 spatial features | |
- 25 PCA components | |
Parameters: | |
satellite_data (ndarray): Array of shape (bands, height, width) | |
Returns: | |
features_array (ndarray): Array of shape (valid_pixels, 99) | |
valid_mask (ndarray): Boolean mask of valid pixels | |
feature_names (list): List of 99 feature names | |
""" | |
start_time = datetime.now() | |
logger.info("Extracting features for biomass prediction...") | |
height, width = satellite_data.shape[1], satellite_data.shape[2] | |
# Create valid pixel mask (no NaN or Inf values) | |
valid_mask = np.all(np.isfinite(satellite_data), axis=0) | |
valid_y, valid_x = np.where(valid_mask) | |
n_valid = len(valid_y) | |
logger.info(f"Found {n_valid} valid pixels out of {height*width}") | |
# Generate all feature categories | |
logger.info("Calculating spectral indices...") | |
indices = calculate_spectral_indices(satellite_data) | |
logger.info("Extracting texture features...") | |
texture_features = extract_texture_features(satellite_data) | |
logger.info("Calculating spatial features...") | |
spatial_features = calculate_spatial_features(satellite_data, indices) | |
logger.info("Computing PCA components...") | |
pca_features = calculate_pca_features(satellite_data) | |
# Define the ordered list of feature names | |
feature_names = [] | |
# 1. Add original band names (Band_01 through Band_59) | |
for i in range(1, 60): | |
feature_names.append(f'Band_{i:02d}') | |
# 2. Add spectral indices | |
spectral_indices = ['NDVI', 'EVI', 'SAVI', 'MSAVI2', 'NDWI', 'NDMI', 'NBR'] | |
feature_names.extend(spectral_indices) | |
# 3. Add texture features | |
texture_names = ['Sobel_B7', 'LBP_B7', 'GLCM_contrast_B7', 'GLCM_dissimilarity_B7', | |
'GLCM_homogeneity_B7', 'GLCM_energy_B7'] | |
feature_names.extend(texture_names) | |
# 4. Add spatial features | |
spatial_names = ['Gradient_B7', 'NDVI_gradient'] | |
feature_names.extend(spatial_names) | |
# 5. Add PCA components | |
for i in range(1, 26): | |
feature_names.append(f'PCA_{i:02d}') | |
# Create feature dictionary with all features | |
all_features = {} | |
# 1. Original bands | |
for i in range(min(satellite_data.shape[0], 59)): | |
all_features[f'Band_{i+1:02d}'] = satellite_data[i] | |
# Pad with zeros if we have fewer than 59 bands | |
for i in range(satellite_data.shape[0], 59): | |
all_features[f'Band_{i+1:02d}'] = np.zeros((height, width), dtype=np.float32) | |
# 2. Add other feature categories | |
all_features.update(indices) | |
all_features.update(texture_features) | |
all_features.update(spatial_features) | |
all_features.update(pca_features) | |
# Verify we have exactly 99 features | |
assert len(feature_names) == 99, f"Expected 99 features, but got {len(feature_names)}" | |
# Extract feature values for valid pixels | |
feature_matrix = np.zeros((n_valid, len(feature_names)), dtype=np.float32) | |
for i, name in enumerate(feature_names): | |
if name in all_features: | |
feature_data = all_features[name] | |
if feature_data.ndim == 2: | |
feature_values = feature_data[valid_y, valid_x] | |
else: | |
feature_values = np.full(n_valid, feature_data) | |
feature_matrix[:, i] = np.nan_to_num(feature_values, nan=0.0) | |
else: | |
logger.warning(f"Feature '{name}' not found, using zeros") | |
feature_matrix[:, i] = 0.0 | |
end_time = datetime.now() | |
processing_time = (end_time - start_time).total_seconds() | |
logger.info(f"Successfully extracted {len(feature_names)} features for {n_valid} pixels in {processing_time:.2f} seconds") | |
return feature_matrix, valid_mask, feature_names | |
# Simple test function | |
def test_feature_extraction(): | |
"""Test the feature extraction pipeline with sample data""" | |
try: | |
# Create sample data (5 bands, 100x100 pixels) | |
satellite_data = np.random.random((5, 100, 100)).astype(np.float32) | |
# Extract features | |
feature_matrix, valid_mask, feature_names = extract_all_features(satellite_data) | |
# Print summary | |
print(f"Sample data shape: {satellite_data.shape}") | |
print(f"Feature matrix shape: {feature_matrix.shape}") | |
print(f"Number of feature names: {len(feature_names)}") | |
print(f"Valid pixels: {np.sum(valid_mask)}") | |
return True | |
except Exception as e: | |
print(f"Feature extraction test failed: {e}") | |
import traceback | |
traceback.print_exc() | |
return False | |
if __name__ == "__main__": | |
# Run a simple test if this script is executed directly | |
test_feature_extraction() |