krafiq's picture
Update app.py
ce900a1
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import load_model
import cv2
import numpy as np
import matplotlib.pyplot as plt
import gradio as gr
# developing the flowfield space
flow_field = np.ones((128,256), dtype = np.uint8)
# Changing the left input side
flow_field[:,0] = 3
# Changing the right output side
flow_field[:,-1] = 4
# Changing the top layer
flow_field[0,:] = 2
# Changing the bottom layer
flow_field[-1,:] = 2
mean_u = 0.075003795
mean_v = -0.000036
mean_p = 0.004301
std_dev_u = 0.04605
std_dev_v = 0.013812
std_dev_p = 0.007917
def nvs_loss(y_pred, rho=10, nu=0.0001): #arbitary rho and nu(Later use values of air)
u,v,p = tf.split(y_pred, 3, axis=3)
#First order derivative
du_dx, du_dy = tf.image.image_gradients(u) # tf.image.image_gradients returns a tuple containing two tensors: u-grad along the x dir and u-grad along the y dir
dv_dx, dv_dy = tf.image.image_gradients(v)
dp_dx, dp_dy = tf.image.image_gradients(p)
#Second order derivatives
du_dx2, du_dydx = tf.image.image_gradients(du_dx) # du_dydx will be unused
du_dxdy, du_dy2 = tf.image.image_gradients(du_dy) # du_dxdy will be unused
dv_dx2, dv_dydx = tf.image.image_gradients(dv_dx)
dv_dxdy, dv_dy2 = tf.image.image_gradients(dv_dy)
#Momentum equation
er1_tensor = tf.math.multiply(u, du_dx) + tf.math.multiply(v, du_dy) + 1.0*dp_dx/rho - nu*(du_dx2 + du_dy2)
er2_tensor = tf.math.multiply(u, dv_dx) + tf.math.multiply(v, dv_dy) + 1.0*dp_dy/rho - nu*(dv_dx2 + dv_dy2)
# # #Continuity equation
er3_tensor = du_dx + dv_dy
er1 = tf.reduce_mean(er1_tensor)
er2 = tf.reduce_mean(er2_tensor)
er3 = tf.reduce_mean(er3_tensor)
return er1*er1 + er2*er2 + er3*er3
# Initiating the Loss Function-
def custom_loss(y_true, y_pred):
nv_loss = nvs_loss(y_pred)
mse_loss = tf.reduce_mean(tf.square(y_true-y_pred)) # Try mse loss function here
return mse_loss + nv_loss
import torch
import matplotlib
def colorize(value, vmin=None, vmax=None, cmap='gray_r', invalid_val=-99, invalid_mask=None, background_color=(128, 128, 128, 255), gamma_corrected=False, value_transform=None):
"""Converts a depth map to a color image.
Args:
value (torch.Tensor, numpy.ndarry): Input depth map. Shape: (H, W) or (1, H, W) or (1, 1, H, W). All singular dimensions are squeezed
vmin (float, optional): vmin-valued entries are mapped to start color of cmap. If None, value.min() is used. Defaults to None.
vmax (float, optional): vmax-valued entries are mapped to end color of cmap. If None, value.max() is used. Defaults to None.
cmap (str, optional): matplotlib colormap to use. Defaults to 'magma_r'.
invalid_val (int, optional): Specifies value of invalid pixels that should be colored as 'background_color'. Defaults to -99.
invalid_mask (numpy.ndarray, optional): Boolean mask for invalid regions. Defaults to None.
background_color (tuple[int], optional): 4-tuple RGB color to give to invalid pixels. Defaults to (128, 128, 128, 255).
gamma_corrected (bool, optional): Apply gamma correction to colored image. Defaults to False.
value_transform (Callable, optional): Apply transform function to valid pixels before coloring. Defaults to None.
Returns:
numpy.ndarray, dtype - uint8: Colored depth map. Shape: (H, W, 4)
"""
if isinstance(value, torch.Tensor):
value = value.detach().cpu().numpy()
value = value.squeeze()
if invalid_mask is None:
invalid_mask = value == invalid_val
mask = np.logical_not(invalid_mask)
# normalize
# vmin = np.percentile(value[mask],2) if vmin is None else vmin
# vmax = np.percentile(value[mask],85) if vmax is None else vmax
vmin = np.min(value[mask]) if vmin is None else vmin
vmax = np.max(value[mask]) if vmax is None else vmax
if vmin != vmax:
value = (value - vmin) / (vmax - vmin) # vmin..vmax
else:
# Avoid 0-division
value = value * 0.
# squeeze last dim if it exists
# grey out the invalid values
value[invalid_mask] = np.nan
cmapper = matplotlib.cm.get_cmap(cmap)
if value_transform:
value = value_transform(value)
# value = value / value.max()
value = cmapper(value, bytes=True) # (nxmx4)
# img = value[:, :, :]
img = value[...]
img[invalid_mask] = background_color
# return img.transpose((2, 0, 1))
if gamma_corrected:
# gamma correction
img = img / 255
img = np.power(img, 2.2)
img = img * 255
img = img.astype(np.uint8)
return img
def img_preprocess(image, h, w):
# Convert the drawn image to grayscale
img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
# Threshold the grayscale image to create a binary image
_, binary_img = cv2.threshold(img_gray, 1, 255, cv2.THRESH_BINARY)
# Perform flood fill starting from a point inside the shape. Fill the inside with pixel value 0
seed_point = (int(h/2), int(w/2))
retval, flooded_image, mask, rect = cv2.floodFill(binary_img, None, seed_point, 0)
flooded_image = (flooded_image/255).astype(np.uint8)
return flooded_image
def patch_stiching(flooded_image, h, w, x0, y0): # ((x0, y0) = center of channel, (w1, h1) = height and width of patch)
flow_field_updated = np.copy(flow_field)
flow_field_updated[int(x0-w/2):int(x0+w/2),int(y0-h/2):int(y0+h/2)] = flooded_image
# flow_field_updated is the main thing that we will use to make our predictions on -
test_img = np.expand_dims(flow_field_updated, axis = 0)
test_img = np.expand_dims(test_img, axis = 3) # Shape of test_img = (1, 128, 256)
return test_img
# Define grid points
x_points = np.linspace(0, 255, 256)
y_points = np.linspace(0, 127, 128)
X, Y = np.meshgrid(x_points, y_points)
def return_quiver_plot(u, v):
velocity = np.sqrt(u**2 + v**2)
ax = plt.subplot()
ax.imshow(velocity, origin = 'lower', extent = (0,256, 0,128), cmap = 'gray')
q = ax.quiver(X[5::8,5::8], Y[5::8,5::8], u[5::8,5::8], u[5::8,5::8], pivot = 'middle', color = 'red')
# ax.quiverkey(q, X=0.9, Y=1.05, U=2,
# label='m/s', labelpos='E')
# plt.title("Velocity distribution")
# plt.show()
return q
def squeeze_function(img):
img = np.squeeze(img, axis = 0)
img = np.squeeze(img, axis = 2)
return img
# Taking a shape from the user on sketchpad and placing it inside the fluid flow -
h, w = 48, 48 # patch_size in which the obstacle will be drawn
x0, y0 = 64, 128 # (x0, y0) = center of channel
def fill_shape_with_pixels(img): #img is taken by gradio as uint8
if img is None:
return np.zeros((h, w), dtype=np.uint8) # "No input sketch"
# Calling the the flooded image function to fill inside the obstacle
flooded_image = img_preprocess(img, h, w)
# Performing patch statching to put the obstacle at the required center position
test_img = patch_stiching(flooded_image, h, w, x0, y0)
# Loading and Compiling the Model
model_path = "Pinns_Loss_file.h5"
model = load_model(model_path, compile = False)
model.compile(loss=custom_loss, optimizer=tf.keras.optimizers.AdamW(learning_rate = 0.0001), metrics=['mae', 'cosine_proximity'])
# Making Model prediction from input sketch shape
prediction = model.predict(test_img) # (prediction.shape = (1, 128, 256, 3))
u_pred, v_pred, p_pred = np.split(prediction, 3, axis=3) # shape of u_pred, v_pred, p_pred = (1, 128, 256, 1)
# De-Normalizing teh Data:
u_pred = ((u_pred*std_dev_u) + mean_u)
v_pred = ((v_pred*std_dev_v) + mean_v)
p_pred = ((p_pred*std_dev_p) + mean_p)
# Making test_img in shape required by zero_pixel_location
req_img = squeeze_function(test_img)
# Storing the location of 0 pixel values
#req_img = req_img.astype(int)
zero_pixel_locations = np.argwhere(req_img == 0)
# Reducing the dimensions-
u_profile = u_pred[0][:,:,0] # shape of u profile to compatible shape (H, W) = (128, 256)
v_profile = v_pred[0][:,:,0]
p_profile = p_pred[0][:,:,0]
p_profile[p_profile>0.02] = 0.02
# Creating a copy of the above profiles-
u_profile_dash = np.copy(u_profile)
v_profile_dash = np.copy(v_profile)
# Creating a copy of the above profiles-
u_profile_dash_1 = np.copy(u_profile)
v_profile_dash_1 = np.copy(v_profile)
# Hollowing the obstacle out from the u and v plots. Origin of imae is lop left and origin of plot is top right
for y, x in zero_pixel_locations:
u_profile_dash[128 - y, x] = 0
v_profile_dash[128 - y, x] = 0
# will be used for image
u_profile_dash_1[y, x] = 0
v_profile_dash_1[y, x] = 0
# Quiver Plot
quiver_plot = plt.figure(figsize = (14,6), edgecolor = "gray")
velocity = np.sqrt(u_profile_dash_1**2 + v_profile_dash_1**2)
ax = plt.subplot()
ax.imshow(velocity, cmap = 'gray', extent = (0,256, 0,128))
q = ax.quiver(X[5::7,5::7], Y[5::7,5::7], u_profile_dash[5::7,5::7], v_profile_dash[5::7,5::7], pivot = 'middle', color = 'red')
ax.quiverkey(q, X=0.9, Y=1.07, U=2,
label='m/s', labelpos='E')
plt.title("Velocity distribution", fontsize = 11)
plt.xlabel("Length of Channel", fontsize = 11)
plt.ylabel("Height of Channel", fontsize = 11)
# StreamLine Plot
streamline_plot = plt.figure(figsize = (14,6), edgecolor = "gray")
plt.streamplot(X, Y, u_profile_dash, v_profile_dash, density = 4)
plt.axis('scaled')
plt.title("Streamline Plot", fontsize = 11)
plt.xlabel("Length of Channel", fontsize = 11)
plt.ylabel("Height of Channel", fontsize = 11)
# Colorize taken from ZoeDepth Model
u_colored = colorize(u_profile, cmap = 'jet')
#cbar_u = plt.colorbar(u_profile,fraction=0.025, pad=0.05)
v_colored = colorize(v_profile, cmap = 'jet')
#cbar_v = plt.colorbar(v_colored,fraction=0.025, pad=0.05)
p_colored = colorize(p_profile, cmap = 'jet')
#cbar_p = plt.colorbar(p_colored,fraction=0.025, pad=0.05)
return colorize(req_img, cmap = 'jet'), quiver_plot, streamline_plot, u_colored, v_colored, p_colored
# Importing gr.Blocks()
with gr.Blocks(theme="Taithrah/Minimal") as demo:
gr.Markdown(
"""
# Channel Flow - Physics Constrained DNN for Predicting Mean Turbulent Flows
The App solves 2-D incompressible steady state NS equations for any given 2-D closed geometry. Geometry needs to be drawn around the center of the patch.\n
It predicts the streamlines,horizontal & vertical velocity profiles and the pressure profiles using a hybrid loss function.\n
Model Parameters (In SI Units) - Kinematic Viscosity = 0.0001, Input horizontal velocity = 0.075, Input vertical velocity = 0
""")
with gr.Row():
with gr.Column():
input_sketch = gr.Image(label = "Draw any Obstacle contour around the patch center",
tool="sketch", source="canvas", shape=(h, w), brush_radius = 3)
Process_button = gr.Button("Process Flow Parameters")
with gr.Column():
filled_channel = gr.Image(label = "Drawn object within fluid domain of dimensions 128*256", container = True)
with gr.Row():
quiver_plot = gr.Plot(label = "Velocity Distribution Around The Obstacle", scale = 2)
with gr.Row():
streamline_plot = gr.Plot(label = "Stream Lines Around The Obstacle", scale = 2)
with gr.Row():
u_image = gr.Image(label = "Horizontal Velocity")
v_image = gr.Image(label = "Vertical Velocity")
p_image = gr.Image(label = "Pressure")
Process_button.click(fn=fill_shape_with_pixels, inputs=input_sketch, outputs=[filled_channel, quiver_plot, streamline_plot, u_image, v_image, p_image])
demo.launch(debug=True, inline = False)