import gradio as gr import torch import torch.nn as nn import pandas as pd import numpy as np from gnn_inference import inference as gnn_inference # ConvLSTM Implementation class ConvLSTMCell(nn.Module): def __init__(self, input_channels, hidden_channels, kernel_size): super(ConvLSTMCell, self).__init__() padding = kernel_size // 2 self.input_channels = input_channels self.hidden_channels = hidden_channels self.Gates = nn.Conv2d(in_channels=input_channels + hidden_channels, out_channels=4 * hidden_channels, kernel_size=kernel_size, padding=padding) def forward(self, input_, hidden_state): h_cur, c_cur = hidden_state combined = torch.cat([input_, h_cur], dim=1) combined_conv = self.Gates(combined) cc_i, cc_f, cc_o, cc_g = torch.split(combined_conv, self.hidden_channels, dim=1) i = torch.sigmoid(cc_i) f = torch.sigmoid(cc_f) o = torch.sigmoid(cc_o) g = torch.tanh(cc_g) c_next = f * c_cur + i * g h_next = o * torch.tanh(c_next) return h_next, c_next class ConvLSTM(nn.Module): def __init__(self, input_channels, hidden_channels, kernel_size): super(ConvLSTM, self).__init__() self.cell = ConvLSTMCell(input_channels, hidden_channels, kernel_size) def forward(self, input_): batch_size, seq_len, _, height, width = input_.size() h_t, c_t = self.init_hidden(batch_size, self.cell.hidden_channels, height, width) output_inner = [] for t in range(seq_len): h_t, c_t = self.cell(input_[:, t, :, :, :], (h_t, c_t)) output_inner.append(h_t) return torch.stack(output_inner, dim=1), (h_t, c_t) def init_hidden(self, batch_size, hidden_channels, height, width): device = next(self.parameters()).device h = torch.zeros(batch_size, hidden_channels, height, width).to(device) c = torch.zeros(batch_size, hidden_channels, height, width).to(device) return h, c class ConvLSTMModel(nn.Module): def __init__(self, input_channels, hidden_channels, kernel_size): super(ConvLSTMModel, self).__init__() self.convlstm = ConvLSTM(input_channels, hidden_channels, kernel_size) self.speed_head = nn.Conv2d(hidden_channels, 1, kernel_size=1) self.direction_head = nn.Conv2d(hidden_channels, 2, kernel_size=1) def forward(self, x): outputs, _ = self.convlstm(x) features = outputs[:, -1, :, :, :] speed = self.speed_head(features) direction = self.direction_head(features) direction = torch.nn.functional.normalize(direction, dim=1) return torch.cat([speed * direction[:, 0:1], speed * direction[:, 1:2]], dim=1) class SingleCSVDataset(torch.utils.data.Dataset): def __init__(self, csv_file, min_vals=None, max_vals=None): self.data = pd.read_csv(csv_file) self.velocity_columns = ['u', 'v', 'u_15', 'v_15', 'u_30', 'v_30', 'u_45', 'v_45'] if min_vals is None or max_vals is None: norm_params = pd.read_csv('normalization_parameters.txt', index_col=0) self.min_vals = norm_params['min'] self.max_vals = norm_params['max'] else: self.min_vals = min_vals self.max_vals = max_vals # Normalize data self.data[self.velocity_columns] = (self.data[self.velocity_columns] - self.min_vals) / (self.max_vals - self.min_vals) # Setup grid self.x_values = np.sort(self.data['x'].unique()) self.y_values = np.sort(self.data['y'].unique()) self.height = len(self.y_values) self.width = len(self.x_values) self.x_to_idx = {x: idx for idx, x in enumerate(self.x_values)} self.y_to_idx = {y: idx for idx, y in enumerate(self.y_values)} self.process_data() def process_data(self): self.inputs = [] times = [('u', 'v'), ('u_15', 'v_15'), ('u_30', 'v_30')] input_seq = [] for u_time, v_time in times: grid = np.zeros((2, self.height, self.width)) for idx, row in self.data.iterrows(): x_idx = self.x_to_idx[row['x']] y_idx = self.y_to_idx[row['y']] grid[0, y_idx, x_idx] = row[u_time] grid[1, y_idx, x_idx] = row[v_time] input_seq.append(grid) input_seq = np.stack(input_seq, axis=0) self.inputs.append(input_seq) self.target_grid = np.zeros((2, self.height, self.width)) for idx, row in self.data.iterrows(): x_idx = self.x_to_idx[row['x']] y_idx = self.y_to_idx[row['y']] self.target_grid[0, y_idx, x_idx] = row['u_45'] self.target_grid[1, y_idx, x_idx] = row['v_45'] def __len__(self): return len(self.inputs) def __getitem__(self, idx): input_seq = torch.tensor(self.inputs[idx], dtype=torch.float32) target = torch.tensor(self.target_grid, dtype=torch.float32) return input_seq, target def calculate_speed_and_direction(u, v): speed = torch.sqrt(u**2 + v**2) direction_x = u / (speed + 1e-6) direction_y = v / (speed + 1e-6) return speed, direction_x, direction_y def calculate_speed_direction_error(u_pred, v_pred, u_true, v_true): speed_pred, dir_x_pred, dir_y_pred = calculate_speed_and_direction(u_pred, v_pred) speed_true, dir_x_true, dir_y_true = calculate_speed_and_direction(u_true, v_true) speed_error = torch.abs(speed_pred - speed_true) cos_sim = dir_x_pred * dir_x_true + dir_y_pred * dir_y_true cos_sim = torch.clamp(cos_sim, -1.0, 1.0) direction_error = torch.acos(cos_sim) * 180 / np.pi return speed_error, direction_error def convlstm_inference(input_csv): try: device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Load model model = ConvLSTMModel(input_channels=2, hidden_channels=64, kernel_size=3) model.load_state_dict(torch.load('2_ConvLSTM.pth', map_location=device)) model.to(device) model.eval() # Process data dataset = SingleCSVDataset(input_csv) with torch.no_grad(): inputs, targets = dataset[0] inputs = inputs.unsqueeze(0).to(device) targets = targets.unsqueeze(0).to(device) outputs = model(inputs) # Get normalization parameters for u and v components u_min = dataset.min_vals['u_45'] u_max = dataset.max_vals['u_45'] v_min = dataset.min_vals['v_45'] v_max = dataset.max_vals['v_45'] # Denormalize outputs_denorm = outputs.clone() targets_denorm = targets.clone() # Denormalize u component (index 0) outputs_denorm[:,0,:,:] = outputs[:,0,:,:] * (u_max - u_min) + u_min targets_denorm[:,0,:,:] = targets[:,0,:,:] * (u_max - u_min) + u_min # Denormalize v component (index 1) outputs_denorm[:,1,:,:] = outputs[:,1,:,:] * (v_max - v_min) + v_min targets_denorm[:,1,:,:] = targets[:,1,:,:] * (v_max - v_min) + v_min # Calculate errors speed_error, direction_error = calculate_speed_direction_error( outputs_denorm[:,0,:,:], outputs_denorm[:,1,:,:], targets_denorm[:,0,:,:], targets_denorm[:,1,:,:] ) # Create results DataFrame results = pd.DataFrame({ 'x': dataset.data['x'], 'y': dataset.data['y'], 'u_pred': outputs_denorm[:,0,:,:].cpu().numpy().flatten(), 'v_pred': outputs_denorm[:,1,:,:].cpu().numpy().flatten(), 'u_true': targets_denorm[:,0,:,:].cpu().numpy().flatten(), 'v_true': targets_denorm[:,1,:,:].cpu().numpy().flatten(), 'speed_error': speed_error.cpu().numpy().flatten(), 'direction_error': direction_error.cpu().numpy().flatten() }) return results except Exception as e: raise Exception(f"Error in ConvLSTM inference: {str(e)}") def predict(input_csv, model_type="ConvLSTM"): try: if model_type == "ConvLSTM": # Get the file path from the Gradio file object csv_path = input_csv.name if hasattr(input_csv, 'name') else input_csv # Get results from ConvLSTM results = convlstm_inference(csv_path) # Calculate error statistics avg_speed_error = results['speed_error'].mean() avg_direction_error = results['direction_error'].mean() min_speed_error = results['speed_error'].min() max_speed_error = results['speed_error'].max() min_direction_error = results['direction_error'].min() max_direction_error = results['direction_error'].max() # Create error distribution plot import matplotlib.pyplot as plt fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4)) ax1.hist(results['speed_error'], bins=30, color='#3498db', alpha=0.7) ax1.set_title('Speed Error Distribution') ax1.set_xlabel('Error (m/s)') ax1.set_ylabel('Frequency') ax2.hist(results['direction_error'], bins=30, color='#e74c3c', alpha=0.7) ax2.set_title('Direction Error Distribution') ax2.set_xlabel('Error (degrees)') ax2.set_ylabel('Frequency') plt.tight_layout() html_output = f"""

Summary Statistics

Speed Error (m/s)

Direction Error (degrees)

""" return fig, html_output, results elif model_type == "GNN": from gnn_inference import inference as gnn_inference return gnn_inference(input_csv) else: error_html = f"""

Error

{str(model_type)}

""" return None, error_html, None except Exception as e: error_html = f"""

Error

{str(e)}

""" return None, error_html, None # Create Gradio interface iface = gr.Interface( fn=predict, inputs=[ gr.File(label="Input CSV"), gr.Radio(["ConvLSTM", "GNN"], label="Model Type", value="ConvLSTM") ], outputs=[ gr.Plot(label="Error Distribution"), gr.HTML(label="Error Metrics"), gr.DataFrame(label="Detailed Results") ], title="Wind Prediction Models", description="Predict wind patterns using either ConvLSTM or GNN models. Select a file from examples or upload your own, choose a model type, then click Submit.", examples=[ ["sequence_05-45_to_06-30.csv"], ["sequence_07-00_to_07-45.csv"], ["sequence_09-15_to_10-00.csv"], ["sequence_12-45_to_13-30.csv"] ], examples_per_page=10, allow_flagging="never", cache_examples=False, theme="dark" ) if __name__ == "__main__": iface.launch()