import streamlit as st import numpy as np from PIL import Image import cv2 from scipy.ndimage import gaussian_filter # ------------------ TC CENTERING UTILS ------------------ def find_tc_center(ir_image, smoothing_sigma=3): smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma) min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape) return min_coords[::-1] # Return as (x, y) def extract_local_region(ir_image, center, region_size=95): h, w = ir_image.shape half_size = region_size // 2 x_min = max(center[0] - half_size, 0) x_max = min(center[0] + half_size, w) y_min = max(center[1] - half_size, 0) y_max = min(center[1] + half_size, h) region = np.full((region_size, region_size), np.nan) extracted = ir_image[y_min:y_max, x_min:x_max] region[:extracted.shape[0], :extracted.shape[1]] = extracted return region def generate_hovmoller(X_data): hovmoller_list = [] for ir_images in X_data: # ir_images: shape (8, 95, 95) time_steps = ir_images.shape[0] hovmoller_data = np.zeros((time_steps, 95, 95)) for t in range(time_steps): tc_center = find_tc_center(ir_images[t]) hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95) hovmoller_list.append(hovmoller_data) return np.array(hovmoller_list) def reshape_vmax(vmax_values, chunk_size=8): trimmed_size = (len(vmax_values) // chunk_size) * chunk_size vmax_values_trimmed = vmax_values[:trimmed_size] return vmax_values_trimmed.reshape(-1, chunk_size) def create_3d_vmax(vmax_2d_array): # Initialize a 3D array of shape (N, 8, 8) filled with zeros vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8)) # Fill the diagonal for each row in the 3D array for i in range(vmax_2d_array.shape[0]): np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i]) # Reshape to (N*10, 8, 8, 1) and remove the last element vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1) # Trim last element return vmax_3d_array def process_lat_values(data): lat_values = data # Convert to NumPy array # Trim the array to make its length divisible by 8 trimmed_size = (len(lat_values) // 8) * 8 lat_values_trimmed = lat_values[:trimmed_size] lat_values_trimmed=np.array(lat_values_trimmed) # Convert to NumPy array # Reshape into a 2D array (rows of 8 values each) and remove the last row lat_2d_array = lat_values_trimmed.reshape(-1, 8) return lat_2d_array def process_lon_values(data): lon_values =data # Convert to NumPy array lon_values = np.array(lon_values) # Convert to NumPy array # Trim the array to make its length divisible by 8 trimmed_size = (len(lon_values) // 8) * 8 lon_values_trimmed = lon_values[:trimmed_size] # Reshape into a 2D array (rows of 8 values each) and remove the last row lon_2d_array = lon_values_trimmed.reshape(-1, 8) return lon_2d_array import numpy as np def calculate_intensity_difference(vmax_2d_array): """Calculates intensity difference for each row in Vmax 2D array.""" int_diff = [] for i in vmax_2d_array: k = abs(i[0] - i[-1]) # Absolute difference between first & last element i = np.append(i, k) # Append difference as the 9th element int_diff.append(i) return np.array(int_diff) import numpy as np # Function to process and reshape image data def process_images(images, batch_size=8, img_size=(95, 95, 1)): num_images = images.shape[0] # Trim the dataset to make it divisible by batch_size trimmed_size = (num_images // batch_size) * batch_size images_trimmed = images[:trimmed_size] # Reshape into (x, batch_size, img_size[0], img_size[1], img_size[2]) images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size) return images_reshaped import numpy as np def process_cc_mask(cc_data): """Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1).""" num_images = cc_data.shape[0] batch_size = 8 trimmed_size = (num_images // batch_size) * batch_size # Ensure divisibility by 8 images_trimmed = cc_data[:trimmed_size] # Trim excess images cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1) # Reshape return cc_images def extract_convective_cores(ir_data): """ Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper. Args: ir_data: IR imagery of shape (height, width). Returns: cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width). """ height, width,c = ir_data.shape cc_mask = np.zeros_like(ir_data, dtype=np.float32) # Initialize CC mask # Define the neighborhood (8-connected) neighbors = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0,0) , (0, 1), (1, -1), (1, 0), (1, 1)] for i in range(1, height - 1): # Avoid boundary pixels for j in range(1, width - 1): bt_ij = ir_data[i, j] # Condition 1: BT < 253K if (bt_ij >= 253).any(): continue # Condition 2: BT <= BT_n for all neighbors is_local_min = True for di, dj in neighbors: if ir_data[i + di, j + dj] < bt_ij: is_local_min = False break if not is_local_min: continue # Condition 3: Gradient condition numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1 numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0 lhs = numerator1 + numerator2 rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217)) if lhs > rhs: cc_mask[i, j] = 1 # Mark as CC return cc_mask def compute_convective_core_masks(ir_data): """Extracts convective core masks for each IR image.""" cc_mask = [] for i in ir_data: c = extract_convective_cores(i) # Assuming this function is defined c = np.array(c) cc_mask.append(c) return np.array(cc_mask) # ------------------ Streamlit UI ------------------ st.set_page_config(page_title="TCIR Daily Input", layout="wide") st.title("Tropical Cyclone U-Net Wind Speed (Intensity) Predictor") ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) if len(ir_images) != 8 or len(pmw_images) != 8: st.warning("Please upload exactly 8 IR and 8 PMW images.") else: st.success("Uploaded 8 IR and 8 PMW images successfully.") st.header("Input Latitude, Longitude, Vmax") lat_values, lon_values, vmax_values = [], [], [] import pandas as pd import numpy as np # File uploader csv_file = st.file_uploader("Upload CSV file", type=["csv"]) if csv_file is not None: try: df = pd.read_csv(csv_file) required_columns = {'Latitude', 'Longitude', 'Vmax'} if required_columns.issubset(df.columns): lat_values = df['Latitude'].values lon_values = df['Longitude'].values vmax_values = df['Vmax'].values lat_values = np.array(lat_values) lon_values = np.array(lon_values) vmax_values = np.array(vmax_values) st.success("CSV file loaded and processed successfully!") st.write(df.head()) else: st.error("CSV file must contain 'Latitude', 'Longitude', and 'Vmax' columns.") except Exception as e: st.error(f"Error reading CSV: {e}") else: st.warning("Please upload a CSV file.") st.header("Select Prediction Model") model_choice = st.selectbox( "Choose a model for prediction", ("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"), index=0 ) # ------------------ Process Button ------------------ if st.button("Submit for Processing"): if len(ir_images) == 8 and len(pmw_images) == 8: # st.success("Starting preprocessing...") if model_choice == "Unet_LSTM": from unetlstm import predict_unetlstm model_predict_fn = predict_unetlstm elif model_choice == "ConvGRU": from gru_model import predict model_predict_fn = predict elif model_choice == "ConvLSTM": from convlstm import predict_lstm model_predict_fn = predict_lstm elif model_choice == "3DCNN": from cnn3d import predict_3dcnn model_predict_fn = predict_3dcnn elif model_choice == "Traj-GRU": from trjgru import predict_trajgru model_predict_fn = predict_trajgru elif model_choice == "spatiotemporalLSTM": from spaio_temp import predict_stlstm model_predict_fn = predict_stlstm ir_arrays = [] pmw_arrays = [] train_vmax_2d = reshape_vmax(np.array(vmax_values)) train_vmax_3d= create_3d_vmax(train_vmax_2d) lat_processed = process_lat_values(lat_values) lon_processed = process_lon_values(lon_values) v_max_diff = calculate_intensity_difference(train_vmax_2d) for ir in ir_images: img = Image.open(ir).convert("L") arr = np.array(img).astype(np.float32) bt_arr = (arr / 255.0) * (310 - 190) + 190 resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC) ir_arrays.append(resized) for pmw in pmw_images: img = Image.open(pmw).convert("L") arr = np.array(img).astype(np.float32) / 255.0 resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC) pmw_arrays.append(resized) ir=np.array(ir_arrays) pmw=np.array(pmw_arrays) # Stack into (8, 95, 95) ir_seq = process_images(ir) pmw_seq = process_images(pmw) # For demonstration: create batches X_train_new = ir_seq.reshape((1, 8, 95, 95)) # Shape: (1, 8, 95, 95) cc_mask= compute_convective_core_masks(X_train_new) hov_m_train = generate_hovmoller(X_train_new) hov_m_train[np.isnan(hov_m_train)] = 0 hov_m_train = hov_m_train.transpose(0, 2, 3, 1) cc_mask[np.isnan(cc_mask)] = 0 cc_mask=cc_mask.reshape(1, 8, 95, 95, 1) i_images=cc_mask+ir_seq reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) reduced_images[np.isnan(reduced_images)] = 0 if model_choice == "Unet_LSTM": import tensorflow as tf def tf_gradient_magnitude(images): # Sobel kernels sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32) sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32) sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1]) sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1]) images = tf.convert_to_tensor(images, dtype=tf.float32) images = tf.expand_dims(images, -1) gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME') gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME') grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6) return tf.squeeze(grad_mag, -1).numpy() def GM_maps_prep(ir): GM_maps=[] for i in ir: GM_map = tf_gradient_magnitude(i) GM_maps.append(GM_map) GM_maps=np.array(GM_maps) return GM_maps ir_seq=ir_seq.reshape(8, 95, 95, 1) GM_maps = GM_maps_prep(ir_seq) print(GM_maps.shape) GM_maps=GM_maps.reshape(1, 8, 95, 95, 1) i_images=cc_mask+ir_seq+GM_maps reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) reduced_images[np.isnan(reduced_images)] = 0 print(reduced_images.shape) y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) else: y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) st.write("Predicted Vmax:", y) else: st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.")