|
import streamlit as st |
|
import numpy as np |
|
from PIL import Image |
|
import cv2 |
|
from scipy.ndimage import gaussian_filter |
|
|
|
|
|
|
|
def find_tc_center(ir_image, smoothing_sigma=3): |
|
smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma) |
|
min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape) |
|
return min_coords[::-1] |
|
|
|
def extract_local_region(ir_image, center, region_size=95): |
|
h, w = ir_image.shape |
|
half_size = region_size // 2 |
|
x_min = max(center[0] - half_size, 0) |
|
x_max = min(center[0] + half_size, w) |
|
y_min = max(center[1] - half_size, 0) |
|
y_max = min(center[1] + half_size, h) |
|
region = np.full((region_size, region_size), np.nan) |
|
extracted = ir_image[y_min:y_max, x_min:x_max] |
|
region[:extracted.shape[0], :extracted.shape[1]] = extracted |
|
return region |
|
|
|
def generate_hovmoller(X_data): |
|
hovmoller_list = [] |
|
for ir_images in X_data: |
|
time_steps = ir_images.shape[0] |
|
hovmoller_data = np.zeros((time_steps, 95, 95)) |
|
for t in range(time_steps): |
|
tc_center = find_tc_center(ir_images[t]) |
|
hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95) |
|
hovmoller_list.append(hovmoller_data) |
|
return np.array(hovmoller_list) |
|
|
|
def reshape_vmax(vmax_values, chunk_size=8): |
|
trimmed_size = (len(vmax_values) // chunk_size) * chunk_size |
|
vmax_values_trimmed = vmax_values[:trimmed_size] |
|
return vmax_values_trimmed.reshape(-1, chunk_size) |
|
def create_3d_vmax(vmax_2d_array): |
|
|
|
vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8)) |
|
|
|
|
|
for i in range(vmax_2d_array.shape[0]): |
|
np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i]) |
|
|
|
|
|
vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1) |
|
|
|
|
|
return vmax_3d_array |
|
|
|
def process_lat_values(data): |
|
lat_values = data |
|
|
|
|
|
trimmed_size = (len(lat_values) // 8) * 8 |
|
lat_values_trimmed = lat_values[:trimmed_size] |
|
lat_values_trimmed=np.array(lat_values_trimmed) |
|
|
|
lat_2d_array = lat_values_trimmed.reshape(-1, 8) |
|
|
|
return lat_2d_array |
|
|
|
def process_lon_values(data): |
|
lon_values =data |
|
lon_values = np.array(lon_values) |
|
|
|
trimmed_size = (len(lon_values) // 8) * 8 |
|
lon_values_trimmed = lon_values[:trimmed_size] |
|
|
|
|
|
lon_2d_array = lon_values_trimmed.reshape(-1, 8) |
|
|
|
return lon_2d_array |
|
|
|
import numpy as np |
|
|
|
def calculate_intensity_difference(vmax_2d_array): |
|
"""Calculates intensity difference for each row in Vmax 2D array.""" |
|
int_diff = [] |
|
|
|
for i in vmax_2d_array: |
|
k = abs(i[0] - i[-1]) |
|
i = np.append(i, k) |
|
int_diff.append(i) |
|
|
|
return np.array(int_diff) |
|
|
|
import numpy as np |
|
|
|
|
|
def process_images(images, batch_size=8, img_size=(95, 95, 1)): |
|
num_images = images.shape[0] |
|
|
|
|
|
trimmed_size = (num_images // batch_size) * batch_size |
|
images_trimmed = images[:trimmed_size] |
|
|
|
|
|
images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size) |
|
|
|
return images_reshaped |
|
|
|
import numpy as np |
|
|
|
def process_cc_mask(cc_data): |
|
"""Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1).""" |
|
num_images = cc_data.shape[0] |
|
batch_size = 8 |
|
trimmed_size = (num_images // batch_size) * batch_size |
|
|
|
images_trimmed = cc_data[:trimmed_size] |
|
cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1) |
|
|
|
return cc_images |
|
def extract_convective_cores(ir_data): |
|
""" |
|
Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper. |
|
Args: |
|
ir_data: IR imagery of shape (height, width). |
|
Returns: |
|
cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width). |
|
""" |
|
height, width,c = ir_data.shape |
|
cc_mask = np.zeros_like(ir_data, dtype=np.float32) |
|
|
|
|
|
neighbors = [(-1, -1), (-1, 0), (-1, 1), |
|
(0, -1), (0,0) , (0, 1), |
|
(1, -1), (1, 0), (1, 1)] |
|
|
|
for i in range(1, height - 1): |
|
for j in range(1, width - 1): |
|
bt_ij = ir_data[i, j] |
|
|
|
|
|
if (bt_ij >= 253).any(): |
|
continue |
|
|
|
|
|
is_local_min = True |
|
for di, dj in neighbors: |
|
if ir_data[i + di, j + dj] < bt_ij: |
|
is_local_min = False |
|
break |
|
if not is_local_min: |
|
continue |
|
|
|
|
|
numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1 |
|
numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0 |
|
lhs = numerator1 + numerator2 |
|
rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217)) |
|
|
|
if lhs > rhs: |
|
cc_mask[i, j] = 1 |
|
|
|
return cc_mask |
|
|
|
def compute_convective_core_masks(ir_data): |
|
"""Extracts convective core masks for each IR image.""" |
|
cc_mask = [] |
|
|
|
for i in ir_data: |
|
c = extract_convective_cores(i) |
|
c = np.array(c) |
|
cc_mask.append(c) |
|
|
|
return np.array(cc_mask) |
|
|
|
|
|
|
|
st.set_page_config(page_title="TCIR Daily Input", layout="wide") |
|
|
|
st.title("Tropical Cyclone U-Net Wind Speed (Intensity) Predictor") |
|
|
|
ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) |
|
pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True) |
|
|
|
if len(ir_images) != 8 or len(pmw_images) != 8: |
|
st.warning("Please upload exactly 8 IR and 8 PMW images.") |
|
else: |
|
st.success("Uploaded 8 IR and 8 PMW images successfully.") |
|
|
|
st.header("Input Latitude, Longitude, Vmax") |
|
lat_values, lon_values, vmax_values = [], [], [] |
|
|
|
import pandas as pd |
|
|
|
import numpy as np |
|
|
|
|
|
csv_file = st.file_uploader("Upload CSV file", type=["csv"]) |
|
|
|
if csv_file is not None: |
|
try: |
|
df = pd.read_csv(csv_file) |
|
required_columns = {'Latitude', 'Longitude', 'Vmax'} |
|
|
|
if required_columns.issubset(df.columns): |
|
lat_values = df['Latitude'].values |
|
lon_values = df['Longitude'].values |
|
vmax_values = df['Vmax'].values |
|
|
|
lat_values = np.array(lat_values) |
|
lon_values = np.array(lon_values) |
|
vmax_values = np.array(vmax_values) |
|
|
|
st.success("CSV file loaded and processed successfully!") |
|
st.write(df.head()) |
|
|
|
else: |
|
st.error("CSV file must contain 'Latitude', 'Longitude', and 'Vmax' columns.") |
|
except Exception as e: |
|
st.error(f"Error reading CSV: {e}") |
|
else: |
|
st.warning("Please upload a CSV file.") |
|
st.header("Select Prediction Model") |
|
model_choice = st.selectbox( |
|
"Choose a model for prediction", |
|
("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"), |
|
index=0 |
|
) |
|
|
|
if st.button("Submit for Processing"): |
|
|
|
if len(ir_images) == 8 and len(pmw_images) == 8: |
|
|
|
if model_choice == "Unet_LSTM": |
|
from unetlstm import predict_unetlstm |
|
model_predict_fn = predict_unetlstm |
|
elif model_choice == "ConvGRU": |
|
from gru_model import predict |
|
model_predict_fn = predict |
|
elif model_choice == "ConvLSTM": |
|
from convlstm import predict_lstm |
|
model_predict_fn = predict_lstm |
|
elif model_choice == "3DCNN": |
|
from cnn3d import predict_3dcnn |
|
model_predict_fn = predict_3dcnn |
|
elif model_choice == "Traj-GRU": |
|
from trjgru import predict_trajgru |
|
model_predict_fn = predict_trajgru |
|
elif model_choice == "spatiotemporalLSTM": |
|
from spaio_temp import predict_stlstm |
|
model_predict_fn = predict_stlstm |
|
|
|
ir_arrays = [] |
|
pmw_arrays = [] |
|
train_vmax_2d = reshape_vmax(np.array(vmax_values)) |
|
|
|
train_vmax_3d= create_3d_vmax(train_vmax_2d) |
|
|
|
lat_processed = process_lat_values(lat_values) |
|
lon_processed = process_lon_values(lon_values) |
|
|
|
v_max_diff = calculate_intensity_difference(train_vmax_2d) |
|
|
|
for ir in ir_images: |
|
img = Image.open(ir).convert("L") |
|
arr = np.array(img).astype(np.float32) |
|
bt_arr = (arr / 255.0) * (310 - 190) + 190 |
|
resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC) |
|
ir_arrays.append(resized) |
|
|
|
for pmw in pmw_images: |
|
img = Image.open(pmw).convert("L") |
|
arr = np.array(img).astype(np.float32) / 255.0 |
|
resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC) |
|
pmw_arrays.append(resized) |
|
ir=np.array(ir_arrays) |
|
pmw=np.array(pmw_arrays) |
|
|
|
|
|
ir_seq = process_images(ir) |
|
pmw_seq = process_images(pmw) |
|
|
|
|
|
|
|
X_train_new = ir_seq.reshape((1, 8, 95, 95)) |
|
|
|
cc_mask= compute_convective_core_masks(X_train_new) |
|
hov_m_train = generate_hovmoller(X_train_new) |
|
hov_m_train[np.isnan(hov_m_train)] = 0 |
|
hov_m_train = hov_m_train.transpose(0, 2, 3, 1) |
|
|
|
cc_mask[np.isnan(cc_mask)] = 0 |
|
cc_mask=cc_mask.reshape(1, 8, 95, 95, 1) |
|
i_images=cc_mask+ir_seq |
|
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) |
|
reduced_images[np.isnan(reduced_images)] = 0 |
|
|
|
if model_choice == "Unet_LSTM": |
|
import tensorflow as tf |
|
|
|
def tf_gradient_magnitude(images): |
|
|
|
sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32) |
|
sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32) |
|
sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1]) |
|
sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1]) |
|
|
|
images = tf.convert_to_tensor(images, dtype=tf.float32) |
|
images = tf.expand_dims(images, -1) |
|
|
|
gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME') |
|
gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME') |
|
grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6) |
|
|
|
return tf.squeeze(grad_mag, -1).numpy() |
|
def GM_maps_prep(ir): |
|
GM_maps=[] |
|
for i in ir: |
|
GM_map = tf_gradient_magnitude(i) |
|
GM_maps.append(GM_map) |
|
GM_maps=np.array(GM_maps) |
|
return GM_maps |
|
ir_seq=ir_seq.reshape(8, 95, 95, 1) |
|
GM_maps = GM_maps_prep(ir_seq) |
|
print(GM_maps.shape) |
|
GM_maps=GM_maps.reshape(1, 8, 95, 95, 1) |
|
i_images=cc_mask+ir_seq+GM_maps |
|
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1) |
|
reduced_images[np.isnan(reduced_images)] = 0 |
|
print(reduced_images.shape) |
|
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) |
|
else: |
|
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff) |
|
st.write("Predicted Vmax:", y) |
|
else: |
|
st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.") |
|
|