nanduriprudhvi's picture
Update app.py
22f2401 verified
import streamlit as st
import numpy as np
from PIL import Image
import cv2
from scipy.ndimage import gaussian_filter
# ------------------ TC CENTERING UTILS ------------------
def find_tc_center(ir_image, smoothing_sigma=3):
smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma)
min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape)
return min_coords[::-1] # Return as (x, y)
def extract_local_region(ir_image, center, region_size=95):
h, w = ir_image.shape
half_size = region_size // 2
x_min = max(center[0] - half_size, 0)
x_max = min(center[0] + half_size, w)
y_min = max(center[1] - half_size, 0)
y_max = min(center[1] + half_size, h)
region = np.full((region_size, region_size), np.nan)
extracted = ir_image[y_min:y_max, x_min:x_max]
region[:extracted.shape[0], :extracted.shape[1]] = extracted
return region
def generate_hovmoller(X_data):
hovmoller_list = []
for ir_images in X_data: # ir_images: shape (8, 95, 95)
time_steps = ir_images.shape[0]
hovmoller_data = np.zeros((time_steps, 95, 95))
for t in range(time_steps):
tc_center = find_tc_center(ir_images[t])
hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95)
hovmoller_list.append(hovmoller_data)
return np.array(hovmoller_list)
def reshape_vmax(vmax_values, chunk_size=8):
trimmed_size = (len(vmax_values) // chunk_size) * chunk_size
vmax_values_trimmed = vmax_values[:trimmed_size]
return vmax_values_trimmed.reshape(-1, chunk_size)
def create_3d_vmax(vmax_2d_array):
# Initialize a 3D array of shape (N, 8, 8) filled with zeros
vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8))
# Fill the diagonal for each row in the 3D array
for i in range(vmax_2d_array.shape[0]):
np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i])
# Reshape to (N*10, 8, 8, 1) and remove the last element
vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1)
# Trim last element
return vmax_3d_array
def process_lat_values(data):
lat_values = data # Convert to NumPy array
# Trim the array to make its length divisible by 8
trimmed_size = (len(lat_values) // 8) * 8
lat_values_trimmed = lat_values[:trimmed_size]
lat_values_trimmed=np.array(lat_values_trimmed) # Convert to NumPy array
# Reshape into a 2D array (rows of 8 values each) and remove the last row
lat_2d_array = lat_values_trimmed.reshape(-1, 8)
return lat_2d_array
def process_lon_values(data):
lon_values =data # Convert to NumPy array
lon_values = np.array(lon_values) # Convert to NumPy array
# Trim the array to make its length divisible by 8
trimmed_size = (len(lon_values) // 8) * 8
lon_values_trimmed = lon_values[:trimmed_size]
# Reshape into a 2D array (rows of 8 values each) and remove the last row
lon_2d_array = lon_values_trimmed.reshape(-1, 8)
return lon_2d_array
import numpy as np
def calculate_intensity_difference(vmax_2d_array):
"""Calculates intensity difference for each row in Vmax 2D array."""
int_diff = []
for i in vmax_2d_array:
k = abs(i[0] - i[-1]) # Absolute difference between first & last element
i = np.append(i, k) # Append difference as the 9th element
int_diff.append(i)
return np.array(int_diff)
import numpy as np
# Function to process and reshape image data
def process_images(images, batch_size=8, img_size=(95, 95, 1)):
num_images = images.shape[0]
# Trim the dataset to make it divisible by batch_size
trimmed_size = (num_images // batch_size) * batch_size
images_trimmed = images[:trimmed_size]
# Reshape into (x, batch_size, img_size[0], img_size[1], img_size[2])
images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size)
return images_reshaped
import numpy as np
def process_cc_mask(cc_data):
"""Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1)."""
num_images = cc_data.shape[0]
batch_size = 8
trimmed_size = (num_images // batch_size) * batch_size # Ensure divisibility by 8
images_trimmed = cc_data[:trimmed_size] # Trim excess images
cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1) # Reshape
return cc_images
def extract_convective_cores(ir_data):
"""
Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper.
Args:
ir_data: IR imagery of shape (height, width).
Returns:
cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width).
"""
height, width,c = ir_data.shape
cc_mask = np.zeros_like(ir_data, dtype=np.float32) # Initialize CC mask
# Define the neighborhood (8-connected)
neighbors = [(-1, -1), (-1, 0), (-1, 1),
(0, -1), (0,0) , (0, 1),
(1, -1), (1, 0), (1, 1)]
for i in range(1, height - 1): # Avoid boundary pixels
for j in range(1, width - 1):
bt_ij = ir_data[i, j]
# Condition 1: BT < 253K
if (bt_ij >= 253).any():
continue
# Condition 2: BT <= BT_n for all neighbors
is_local_min = True
for di, dj in neighbors:
if ir_data[i + di, j + dj] < bt_ij:
is_local_min = False
break
if not is_local_min:
continue
# Condition 3: Gradient condition
numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1
numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0
lhs = numerator1 + numerator2
rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217))
if lhs > rhs:
cc_mask[i, j] = 1 # Mark as CC
return cc_mask
def compute_convective_core_masks(ir_data):
"""Extracts convective core masks for each IR image."""
cc_mask = []
for i in ir_data:
c = extract_convective_cores(i) # Assuming this function is defined
c = np.array(c)
cc_mask.append(c)
return np.array(cc_mask)
# ------------------ Streamlit UI ------------------
st.set_page_config(page_title="TCIR Daily Input", layout="wide")
st.title("Tropical Cyclone Input Uploader (8 sets/day)")
ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
if len(ir_images) != 8 or len(pmw_images) != 8:
st.warning("Please upload exactly 8 IR and 8 PMW images.")
else:
st.success("Uploaded 8 IR and 8 PMW images successfully.")
st.header("Input Latitude, Longitude, Vmax")
lat_values, lon_values, vmax_values = [], [], []
# col1, col2, col3 = st.columns(3)
# with col1:
# for i in range(8):
# lat_values.append(st.number_input(f"Latitude {i+1}", key=f"lat{i}"))
# with col2:
# for i in range(8):
# lon_values.append(st.number_input(f"Longitude {i+1}", key=f"lon{i}"))
# with col3:
# for i in range(8):
# vmax_values.append(st.number_input(f"Vmax {i+1}", key=f"vmax{i}"))
import pandas as pd
import numpy as np
# File uploader
csv_file = st.file_uploader("Upload CSV file", type=["csv"])
if csv_file is not None:
try:
df = pd.read_csv(csv_file)
required_columns = {'Latitude', 'Longitude', 'Vmax'}
if required_columns.issubset(df.columns):
lat_values = df['Latitude'].values
lon_values = df['Longitude'].values
vmax_values = df['Vmax'].values
lat_values = np.array(lat_values)
lon_values = np.array(lon_values)
vmax_values = np.array(vmax_values)
st.success("CSV file loaded and processed successfully!")
# Optional: Show dataframe or data shape
st.write(df.head())
else:
st.error("CSV file must contain 'Latitude', 'Longitude', and 'Vmax' columns.")
except Exception as e:
st.error(f"Error reading CSV: {e}")
else:
st.warning("Please upload a CSV file.")
st.header("Select Prediction Model")
model_choice = st.selectbox(
"Choose a model for prediction",
("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"),
index=0
)
# ------------------ Process Button ------------------
if st.button("Submit for Processing"):
if len(ir_images) == 8 and len(pmw_images) == 8:
# st.success("Starting preprocessing...")
if model_choice == "Unet_LSTM":
from unetlstm import predict_unetlstm
model_predict_fn = predict_unetlstm
elif model_choice == "ConvGRU":
from gru_model import predict
model_predict_fn = predict
elif model_choice == "ConvLSTM":
from convlstm import predict_lstm
model_predict_fn = predict_lstm
elif model_choice == "3DCNN":
from cnn3d import predict_3dcnn
model_predict_fn = predict_3dcnn
elif model_choice == "Traj-GRU":
from trjgru import predict_trajgru
model_predict_fn = predict_trajgru
elif model_choice == "spatiotemporalLSTM":
from spaio_temp import predict_stlstm
model_predict_fn = predict_stlstm
# from gru_model import predict
# from convlstm import predict_lstm
# from cnn3d import predict_3dcnn
# from trjgru import predict_trajgru
# from spaio_temp import predict_stlstm
# from unetlstm import predict_unetlstm
ir_arrays = []
pmw_arrays = []
train_vmax_2d = reshape_vmax(np.array(vmax_values))
# st.write("Vmax 2D shape:", train_vmax_2d.shape)
train_vmax_3d= create_3d_vmax(train_vmax_2d)
# st.write("Vmax 3D shape:", train_vmax_3d.shape)
lat_processed = process_lat_values(lat_values)
lon_processed = process_lon_values(lon_values)
# st.write("Lat 2D shape:", lat_processed.shape)
# st.write("Lon 2D shape:", lon_processed.shape)
v_max_diff = calculate_intensity_difference(train_vmax_2d)
# st.write("Vmax Intensity Difference shape:", v_max_diff.shape)
for ir in ir_images:
img = Image.open(ir).convert("L")
arr = np.array(img).astype(np.float32)
bt_arr = (arr / 255.0) * (310 - 190) + 190
resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC)
ir_arrays.append(resized)
for pmw in pmw_images:
img = Image.open(pmw).convert("L")
arr = np.array(img).astype(np.float32) / 255.0
resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC)
pmw_arrays.append(resized)
ir=np.array(ir_arrays)
pmw=np.array(pmw_arrays)
# Stack into (8, 95, 95)
ir_seq = process_images(ir)
pmw_seq = process_images(pmw)
# st.write(f"IR sequence shape: {ir_seq.shape}")
# st.write(f"PMW sequence shape: {pmw_seq.shape}")
# For demonstration: create batches
X_train_new = ir_seq.reshape((1, 8, 95, 95)) # Shape: (1, 8, 95, 95)
# X_test_new = X_valid = X_train_new.copy() # Dummy copies for now
# st.write(f"X_train_new shape: {X_train_new.shape}")
cc_mask= compute_convective_core_masks(X_train_new)
# st.write("CC Mask Shape:", cc_mask.shape)
hov_m_train = generate_hovmoller(X_train_new)
# hov_m_test = generate_hovmoller(X_test_new)
# hov_m_valid = generate_hovmoller(X_valid)
hov_m_train[np.isnan(hov_m_train)] = 0
hov_m_train = hov_m_train.transpose(0, 2, 3, 1)
# st.success("Hovmöller diagrams generated ✅")
# st.write("Hovmöller Train Shape:", hov_m_train.shape)
# st.write("Hovmöller Test Shape:", hov_m_test.shape)
# st.write("Hovmöller Valid Shape:", hov_m_valid.shape)
# Visualize first sample
# st.subheader("Hovmöller Sample (Train Set)")
# for t in range(8):
# st.image(hov_m_train[0, t], caption=f"Time Step {t+1}", clamp=True, width=150)
# st.write(hov_m_train[0,0])
cc_mask[np.isnan(cc_mask)] = 0
cc_mask=cc_mask.reshape(1, 8, 95, 95, 1)
i_images=cc_mask+ir_seq
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
reduced_images[np.isnan(reduced_images)] = 0
# st.write("Reduced Images Shape:", reduced_images.shape)
# y=np.isnan(reduced_images).sum()
# st.write("Reduced Images NaN Count:", y)
# model_predict_fn = {
# "ConvGRU": predict,
# "ConvLSTM": predict_lstm,
# "3DCNN":predict_3dcnn,
# "Traj-GRU": predict_trajgru,
# "spatiotemporalLSTM": predict_stlstm,
# "Unet_LSTM": predict_unetlstm,
# }[model_choice]
if model_choice == "Unet_LSTM":
import tensorflow as tf
def tf_gradient_magnitude(images):
# Sobel kernels
sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32)
sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32)
sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1])
sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1])
images = tf.convert_to_tensor(images, dtype=tf.float32)
images = tf.expand_dims(images, -1)
gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME')
gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME')
grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6)
return tf.squeeze(grad_mag, -1).numpy()
def GM_maps_prep(ir):
GM_maps=[]
for i in ir:
GM_map = tf_gradient_magnitude(i)
GM_maps.append(GM_map)
GM_maps=np.array(GM_maps)
return GM_maps
ir_seq=ir_seq.reshape(8, 95, 95, 1)
GM_maps = GM_maps_prep(ir_seq)
print(GM_maps.shape)
GM_maps=GM_maps.reshape(1, 8, 95, 95, 1)
i_images=cc_mask+ir_seq+GM_maps
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
reduced_images[np.isnan(reduced_images)] = 0
print(reduced_images.shape)
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
else:
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
st.write("Predicted Vmax:", y)
else:
st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.")