Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| from PIL import Image | |
| import cv2 | |
| from scipy.ndimage import gaussian_filter | |
| # ------------------ TC CENTERING UTILS ------------------ | |
| def find_tc_center(ir_image, smoothing_sigma=3): | |
| smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma) | |
| min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape) | |
| return min_coords[::-1] # Return as (x, y) | |
| def extract_local_region(ir_image, center, region_size=95): | |
| h, w = ir_image.shape | |
| half_size = region_size // 2 | |
| x_min = max(center[0] - half_size, 0) | |
| x_max = min(center[0] + half_size, w) | |
| y_min = max(center[1] - half_size, 0) | |
| y_max = min(center[1] + half_size, h) | |
| region = np.full((region_size, region_size), np.nan) | |
| extracted = ir_image[y_min:y_max, x_min:x_max] | |
| region[:extracted.shape[0], :extracted.shape[1]] = extracted | |
| return region | |
| def generate_hovmoller(X_data): | |
| hovmoller_list = [] | |
| for ir_images in X_data: # ir_images: shape (8, 95, 95) | |
| time_steps = ir_images.shape[0] | |
| hovmoller_data = np.zeros((time_steps, 95, 95)) | |
| for t in range(time_steps): | |
| tc_center = find_tc_center(ir_images[t]) | |
| hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95) | |
| hovmoller_list.append(hovmoller_data) | |
| return np.array(hovmoller_list) | |
| def reshape_vmax(vmax_values, chunk_size=8): | |
| trimmed_size = (len(vmax_values) // chunk_size) * chunk_size | |
| vmax_values_trimmed = vmax_values[:trimmed_size] | |
| return vmax_values_trimmed.reshape(-1, chunk_size) | |
| def create_3d_vmax(vmax_2d_array): | |
| # Initialize a 3D array of shape (N, 8, 8) filled with zeros | |
| vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8)) | |
| # Fill the diagonal for each row in the 3D array | |
| for i in range(vmax_2d_array.shape[0]): | |
| np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i]) | |
| # Reshape to (N*10, 8, 8, 1) and remove the last element | |
| vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1) | |
| # Trim last element | |
| return vmax_3d_array | |
| def process_lat_values(data): | |
| lat_values = data # Convert to NumPy array | |
| # Trim the array to make its length divisible by 8 | |
| trimmed_size = (len(lat_values) // 8) * 8 | |
| lat_values_trimmed = lat_values[:trimmed_size] | |
| lat_values_trimmed=np.array(lat_values_trimmed) # Convert to NumPy array | |
| # Reshape into a 2D array (rows of 8 values each) and remove the last row | |
| lat_2d_array = lat_values_trimmed.reshape(-1, 8) | |
| return lat_2d_array | |
| def process_lon_values(data): | |
| lon_values =data # Convert to NumPy array | |
| lon_values = np.array(lon_values) # Convert to NumPy array | |
| # Trim the array to make its length divisible by 8 | |
| trimmed_size = (len(lon_values) // 8) * 8 | |
| lon_values_trimmed = lon_values[:trimmed_size] | |
| # Reshape into a 2D array (rows of 8 values each) and remove the last row | |
| lon_2d_array = lon_values_trimmed.reshape(-1, 8) | |
| return lon_2d_array | |
| import numpy as np | |
| def calculate_intensity_difference(vmax_2d_array): | |
| """Calculates intensity difference for each row in Vmax 2D array.""" | |
| int_diff = [] | |
| for i in vmax_2d_array: | |
| k = abs(i[0] - i[-1]) # Absolute difference between first & last element | |
| i = np.append(i, k) # Append difference as the 9th element | |
| int_diff.append(i) | |
| return np.array(int_diff) | |
| import numpy as np | |
| # Function to process and reshape image data | |
| def process_images(images, batch_size=8, img_size=(95, 95, 1)): | |
| num_images = images.shape[0] | |
| # Trim the dataset to make it divisible by batch_size | |
| trimmed_size = (num_images // batch_size) * batch_size | |
| images_trimmed = images[:trimmed_size] | |
| # Reshape into (x, batch_size, img_size[0], img_size[1], img_size[2]) | |
| images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size) | |
| return images_reshaped | |
| import numpy as np | |
| def process_cc_mask(cc_data): | |
| """Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1).""" | |
| num_images = cc_data.shape[0] | |
| batch_size = 8 | |
| trimmed_size = (num_images // batch_size) * batch_size # Ensure divisibility by 8 | |
| images_trimmed = cc_data[:trimmed_size] # Trim excess images | |
| cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1) # Reshape | |
| return cc_images | |
| def extract_convective_cores(ir_data): | |
| """ | |
| Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper. | |
| Args: | |
| ir_data: IR imagery of shape (height, width). | |
| Returns: | |
| cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width). | |
| """ | |
| height, width,c = ir_data.shape | |
| cc_mask = np.zeros_like(ir_data, dtype=np.float32) # Initialize CC mask | |
| # Define the neighborhood (8-connected) | |
| neighbors = [(-1, -1), (-1, 0), (-1, 1), | |
| (0, -1), (0,0) , (0, 1), | |
| (1, -1), (1, 0), (1, 1)] | |
| for i in range(1, height - 1): # Avoid boundary pixels | |
| for j in range(1, width - 1): | |
| bt_ij = ir_data[i, j] | |
| # Condition 1: BT < 253K | |
| if (bt_ij >= 253).any(): | |
| continue | |
| # Condition 2: BT <= BT_n for all neighbors | |
| is_local_min = True | |
| for di, dj in neighbors: | |
| if ir_data[i + di, j + dj] < bt_ij: | |
| is_local_min = False | |
| break | |
| if not is_local_min: | |
| continue | |
| # Condition 3: Gradient condition | |
| numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1 | |
| numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0 | |
| lhs = numerator1 + numerator2 | |
| rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217)) | |
| if lhs > rhs: | |
| cc_mask[i, j] = 1 # Mark as CC | |
| return cc_mask | |
| def compute_convective_core_masks(ir_data): | |
| """Extracts convective core masks for each IR image.""" | |
| cc_mask = [] | |
| for i in ir_data: | |
| c = extract_convective_cores(i) # Assuming this function is defined | |
| c = np.array(c) | |
| cc_mask.append(c) | |
| return np.array(cc_mask) | |
| # ------------------ Streamlit UI ------------------ | |
| st.set_page_config(page_title="TCIR Daily Input", layout="wide") | |
| st.title("Tropical Cyclone Input Uploader (8 sets/day)") | |
| ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) | |
| pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) | |
| if len(ir_images) != 8 or len(pmw_images) != 8: | |
| st.warning("Please upload exactly 8 IR and 8 PMW images.") | |
| else: | |
| st.success("Uploaded 8 IR and 8 PMW images successfully.") | |
| st.header("Input Latitude, Longitude, Vmax") | |
| lat_values, lon_values, vmax_values = [], [], [] | |
| # col1, col2, col3 = st.columns(3) | |
| # with col1: | |
| # for i in range(8): | |
| # lat_values.append(st.number_input(f"Latitude {i+1}", key=f"lat{i}")) | |
| # with col2: | |
| # for i in range(8): | |
| # lon_values.append(st.number_input(f"Longitude {i+1}", key=f"lon{i}")) | |
| # with col3: | |
| # for i in range(8): | |
| # vmax_values.append(st.number_input(f"Vmax {i+1}", key=f"vmax{i}")) | |
| import pandas as pd | |
| import numpy as np | |
| # File uploader | |
| csv_file = st.file_uploader("Upload CSV file", type=["csv"]) | |
| if csv_file is not None: | |
| try: | |
| df = pd.read_csv(csv_file) | |
| required_columns = {'Latitude', 'Longitude', 'Vmax'} | |
| if required_columns.issubset(df.columns): | |
| lat_values = df['Latitude'].values | |
| lon_values = df['Longitude'].values | |
| vmax_values = df['Vmax'].values | |
| lat_values = np.array(lat_values) | |
| lon_values = np.array(lon_values) | |
| vmax_values = np.array(vmax_values) | |
| st.success("CSV file loaded and processed successfully!") | |
| # Optional: Show dataframe or data shape | |
| st.write(df.head()) | |
| else: | |
| st.error("CSV file must contain 'Latitude', 'Longitude', and 'Vmax' columns.") | |
| except Exception as e: | |
| st.error(f"Error reading CSV: {e}") | |
| else: | |
| st.warning("Please upload a CSV file.") | |
| st.header("Select Prediction Model") | |
| model_choice = st.selectbox( | |
| "Choose a model for prediction", | |
| ("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"), | |
| index=0 | |
| ) | |
| # ------------------ Process Button ------------------ | |
| if st.button("Submit for Processing"): | |
| if len(ir_images) == 8 and len(pmw_images) == 8: | |
| # st.success("Starting preprocessing...") | |
| if model_choice == "Unet_LSTM": | |
| from unetlstm import predict_unetlstm | |
| model_predict_fn = predict_unetlstm | |
| elif model_choice == "ConvGRU": | |
| from gru_model import predict | |
| model_predict_fn = predict | |
| elif model_choice == "ConvLSTM": | |
| from convlstm import predict_lstm | |
| model_predict_fn = predict_lstm | |
| elif model_choice == "3DCNN": | |
| from cnn3d import predict_3dcnn | |
| model_predict_fn = predict_3dcnn | |
| elif model_choice == "Traj-GRU": | |
| from trjgru import predict_trajgru | |
| model_predict_fn = predict_trajgru | |
| elif model_choice == "spatiotemporalLSTM": | |
| from spaio_temp import predict_stlstm | |
| model_predict_fn = predict_stlstm | |
| # from gru_model import predict | |
| # from convlstm import predict_lstm | |
| # from cnn3d import predict_3dcnn | |
| # from trjgru import predict_trajgru | |
| # from spaio_temp import predict_stlstm | |
| # from unetlstm import predict_unetlstm | |
| ir_arrays = [] | |
| pmw_arrays = [] | |
| train_vmax_2d = reshape_vmax(np.array(vmax_values)) | |
| # st.write("Vmax 2D shape:", train_vmax_2d.shape) | |
| train_vmax_3d= create_3d_vmax(train_vmax_2d) | |
| # st.write("Vmax 3D shape:", train_vmax_3d.shape) | |
| lat_processed = process_lat_values(lat_values) | |
| lon_processed = process_lon_values(lon_values) | |
| # st.write("Lat 2D shape:", lat_processed.shape) | |
| # st.write("Lon 2D shape:", lon_processed.shape) | |
| v_max_diff = calculate_intensity_difference(train_vmax_2d) | |
| # st.write("Vmax Intensity Difference shape:", v_max_diff.shape) | |
| for ir in ir_images: | |
| img = Image.open(ir).convert("L") | |
| arr = np.array(img).astype(np.float32) | |
| bt_arr = (arr / 255.0) * (310 - 190) + 190 | |
| resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC) | |
| ir_arrays.append(resized) | |
| for pmw in pmw_images: | |
| img = Image.open(pmw).convert("L") | |
| arr = np.array(img).astype(np.float32) / 255.0 | |
| resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC) | |
| pmw_arrays.append(resized) | |
| ir=np.array(ir_arrays) | |
| pmw=np.array(pmw_arrays) | |
| # Stack into (8, 95, 95) | |
| ir_seq = process_images(ir) | |
| pmw_seq = process_images(pmw) | |
| # st.write(f"IR sequence shape: {ir_seq.shape}") | |
| # st.write(f"PMW sequence shape: {pmw_seq.shape}") | |
| # For demonstration: create batches | |
| X_train_new = ir_seq.reshape((1, 8, 95, 95)) # Shape: (1, 8, 95, 95) | |
| # X_test_new = X_valid = X_train_new.copy() # Dummy copies for now | |
| # st.write(f"X_train_new shape: {X_train_new.shape}") | |
| cc_mask= compute_convective_core_masks(X_train_new) | |
| # st.write("CC Mask Shape:", cc_mask.shape) | |
| hov_m_train = generate_hovmoller(X_train_new) | |
| # hov_m_test = generate_hovmoller(X_test_new) | |
| # hov_m_valid = generate_hovmoller(X_valid) | |
| hov_m_train[np.isnan(hov_m_train)] = 0 | |
| hov_m_train = hov_m_train.transpose(0, 2, 3, 1) | |
| # st.success("Hovmöller diagrams generated ✅") | |
| # st.write("Hovmöller Train Shape:", hov_m_train.shape) | |
| # st.write("Hovmöller Test Shape:", hov_m_test.shape) | |
| # st.write("Hovmöller Valid Shape:", hov_m_valid.shape) | |
| # Visualize first sample | |
| # st.subheader("Hovmöller Sample (Train Set)") | |
| # for t in range(8): | |
| # st.image(hov_m_train[0, t], caption=f"Time Step {t+1}", clamp=True, width=150) | |
| # st.write(hov_m_train[0,0]) | |
| cc_mask[np.isnan(cc_mask)] = 0 | |
| cc_mask=cc_mask.reshape(1, 8, 95, 95, 1) | |
| i_images=cc_mask+ir_seq | |
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) | |
| reduced_images[np.isnan(reduced_images)] = 0 | |
| # st.write("Reduced Images Shape:", reduced_images.shape) | |
| # y=np.isnan(reduced_images).sum() | |
| # st.write("Reduced Images NaN Count:", y) | |
| # model_predict_fn = { | |
| # "ConvGRU": predict, | |
| # "ConvLSTM": predict_lstm, | |
| # "3DCNN":predict_3dcnn, | |
| # "Traj-GRU": predict_trajgru, | |
| # "spatiotemporalLSTM": predict_stlstm, | |
| # "Unet_LSTM": predict_unetlstm, | |
| # }[model_choice] | |
| if model_choice == "Unet_LSTM": | |
| import tensorflow as tf | |
| def tf_gradient_magnitude(images): | |
| # Sobel kernels | |
| sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32) | |
| sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32) | |
| sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1]) | |
| sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1]) | |
| images = tf.convert_to_tensor(images, dtype=tf.float32) | |
| images = tf.expand_dims(images, -1) | |
| gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME') | |
| gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME') | |
| grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6) | |
| return tf.squeeze(grad_mag, -1).numpy() | |
| def GM_maps_prep(ir): | |
| GM_maps=[] | |
| for i in ir: | |
| GM_map = tf_gradient_magnitude(i) | |
| GM_maps.append(GM_map) | |
| GM_maps=np.array(GM_maps) | |
| return GM_maps | |
| ir_seq=ir_seq.reshape(8, 95, 95, 1) | |
| GM_maps = GM_maps_prep(ir_seq) | |
| print(GM_maps.shape) | |
| GM_maps=GM_maps.reshape(1, 8, 95, 95, 1) | |
| i_images=cc_mask+ir_seq+GM_maps | |
| reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) | |
| reduced_images[np.isnan(reduced_images)] = 0 | |
| print(reduced_images.shape) | |
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) | |
| else: | |
| y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) | |
| st.write("Predicted Vmax:", y) | |
| else: | |
| st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.") | |