|
import pandas as pd |
|
import tensorflow as tf |
|
from tensorflow.keras.models import load_model |
|
import cv2 |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
import gradio as gr |
|
|
|
|
|
flow_field = np.ones((128,256), dtype = np.uint8) |
|
|
|
|
|
flow_field[:,0] = 3 |
|
|
|
flow_field[:,-1] = 4 |
|
|
|
flow_field[0,:] = 2 |
|
|
|
flow_field[-1,:] = 2 |
|
|
|
mean_u = 0.075003795 |
|
mean_v = -0.000036 |
|
mean_p = 0.004301 |
|
|
|
std_dev_u = 0.04605 |
|
std_dev_v = 0.013812 |
|
std_dev_p = 0.007917 |
|
|
|
def nvs_loss(y_pred, rho=10, nu=0.0001): |
|
u,v,p = tf.split(y_pred, 3, axis=3) |
|
|
|
|
|
du_dx, du_dy = tf.image.image_gradients(u) |
|
dv_dx, dv_dy = tf.image.image_gradients(v) |
|
dp_dx, dp_dy = tf.image.image_gradients(p) |
|
|
|
|
|
du_dx2, du_dydx = tf.image.image_gradients(du_dx) |
|
du_dxdy, du_dy2 = tf.image.image_gradients(du_dy) |
|
|
|
dv_dx2, dv_dydx = tf.image.image_gradients(dv_dx) |
|
dv_dxdy, dv_dy2 = tf.image.image_gradients(dv_dy) |
|
|
|
|
|
er1_tensor = tf.math.multiply(u, du_dx) + tf.math.multiply(v, du_dy) + 1.0*dp_dx/rho - nu*(du_dx2 + du_dy2) |
|
er2_tensor = tf.math.multiply(u, dv_dx) + tf.math.multiply(v, dv_dy) + 1.0*dp_dy/rho - nu*(dv_dx2 + dv_dy2) |
|
|
|
|
|
er3_tensor = du_dx + dv_dy |
|
|
|
er1 = tf.reduce_mean(er1_tensor) |
|
er2 = tf.reduce_mean(er2_tensor) |
|
er3 = tf.reduce_mean(er3_tensor) |
|
|
|
return er1*er1 + er2*er2 + er3*er3 |
|
|
|
|
|
def custom_loss(y_true, y_pred): |
|
nv_loss = nvs_loss(y_pred) |
|
mse_loss = tf.reduce_mean(tf.square(y_true-y_pred)) |
|
return mse_loss + nv_loss |
|
|
|
import torch |
|
import matplotlib |
|
def colorize(value, vmin=None, vmax=None, cmap='gray_r', invalid_val=-99, invalid_mask=None, background_color=(128, 128, 128, 255), gamma_corrected=False, value_transform=None): |
|
"""Converts a depth map to a color image. |
|
|
|
Args: |
|
value (torch.Tensor, numpy.ndarry): Input depth map. Shape: (H, W) or (1, H, W) or (1, 1, H, W). All singular dimensions are squeezed |
|
vmin (float, optional): vmin-valued entries are mapped to start color of cmap. If None, value.min() is used. Defaults to None. |
|
vmax (float, optional): vmax-valued entries are mapped to end color of cmap. If None, value.max() is used. Defaults to None. |
|
cmap (str, optional): matplotlib colormap to use. Defaults to 'magma_r'. |
|
invalid_val (int, optional): Specifies value of invalid pixels that should be colored as 'background_color'. Defaults to -99. |
|
invalid_mask (numpy.ndarray, optional): Boolean mask for invalid regions. Defaults to None. |
|
background_color (tuple[int], optional): 4-tuple RGB color to give to invalid pixels. Defaults to (128, 128, 128, 255). |
|
gamma_corrected (bool, optional): Apply gamma correction to colored image. Defaults to False. |
|
value_transform (Callable, optional): Apply transform function to valid pixels before coloring. Defaults to None. |
|
|
|
Returns: |
|
numpy.ndarray, dtype - uint8: Colored depth map. Shape: (H, W, 4) |
|
""" |
|
if isinstance(value, torch.Tensor): |
|
value = value.detach().cpu().numpy() |
|
|
|
value = value.squeeze() |
|
if invalid_mask is None: |
|
invalid_mask = value == invalid_val |
|
mask = np.logical_not(invalid_mask) |
|
|
|
|
|
|
|
|
|
vmin = np.min(value[mask]) if vmin is None else vmin |
|
vmax = np.max(value[mask]) if vmax is None else vmax |
|
if vmin != vmax: |
|
value = (value - vmin) / (vmax - vmin) |
|
else: |
|
|
|
value = value * 0. |
|
|
|
|
|
|
|
|
|
value[invalid_mask] = np.nan |
|
cmapper = matplotlib.cm.get_cmap(cmap) |
|
if value_transform: |
|
value = value_transform(value) |
|
|
|
value = cmapper(value, bytes=True) |
|
|
|
|
|
img = value[...] |
|
img[invalid_mask] = background_color |
|
|
|
|
|
if gamma_corrected: |
|
|
|
img = img / 255 |
|
img = np.power(img, 2.2) |
|
img = img * 255 |
|
img = img.astype(np.uint8) |
|
return img |
|
|
|
def img_preprocess(image, h, w): |
|
|
|
img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) |
|
|
|
|
|
_, binary_img = cv2.threshold(img_gray, 1, 255, cv2.THRESH_BINARY) |
|
|
|
|
|
seed_point = (int(h/2), int(w/2)) |
|
retval, flooded_image, mask, rect = cv2.floodFill(binary_img, None, seed_point, 0) |
|
flooded_image = (flooded_image/255).astype(np.uint8) |
|
return flooded_image |
|
|
|
def patch_stiching(flooded_image, h, w, x0, y0): |
|
flow_field_updated = np.copy(flow_field) |
|
flow_field_updated[int(x0-w/2):int(x0+w/2),int(y0-h/2):int(y0+h/2)] = flooded_image |
|
|
|
|
|
|
|
test_img = np.expand_dims(flow_field_updated, axis = 0) |
|
test_img = np.expand_dims(test_img, axis = 3) |
|
return test_img |
|
|
|
|
|
x_points = np.linspace(0, 255, 256) |
|
y_points = np.linspace(0, 127, 128) |
|
X, Y = np.meshgrid(x_points, y_points) |
|
|
|
def return_quiver_plot(u, v): |
|
velocity = np.sqrt(u**2 + v**2) |
|
ax = plt.subplot() |
|
ax.imshow(velocity, origin = 'lower', extent = (0,256, 0,128), cmap = 'gray') |
|
q = ax.quiver(X[5::8,5::8], Y[5::8,5::8], u[5::8,5::8], u[5::8,5::8], pivot = 'middle', color = 'red') |
|
|
|
|
|
|
|
|
|
return q |
|
|
|
def squeeze_function(img): |
|
img = np.squeeze(img, axis = 0) |
|
img = np.squeeze(img, axis = 2) |
|
return img |
|
|
|
|
|
|
|
h, w = 48, 48 |
|
x0, y0 = 64, 128 |
|
|
|
def fill_shape_with_pixels(img): |
|
if img is None: |
|
return np.zeros((h, w), dtype=np.uint8) |
|
|
|
flooded_image = img_preprocess(img, h, w) |
|
|
|
test_img = patch_stiching(flooded_image, h, w, x0, y0) |
|
|
|
|
|
model_path = "Pinns_Loss_file.h5" |
|
model = load_model(model_path, compile = False) |
|
model.compile(loss=custom_loss, optimizer=tf.keras.optimizers.AdamW(learning_rate = 0.0001), metrics=['mae', 'cosine_proximity']) |
|
|
|
|
|
prediction = model.predict(test_img) |
|
u_pred, v_pred, p_pred = np.split(prediction, 3, axis=3) |
|
|
|
|
|
u_pred = ((u_pred*std_dev_u) + mean_u) |
|
v_pred = ((v_pred*std_dev_v) + mean_v) |
|
p_pred = ((p_pred*std_dev_p) + mean_p) |
|
|
|
|
|
req_img = squeeze_function(test_img) |
|
|
|
|
|
|
|
zero_pixel_locations = np.argwhere(req_img == 0) |
|
|
|
|
|
u_profile = u_pred[0][:,:,0] |
|
v_profile = v_pred[0][:,:,0] |
|
p_profile = p_pred[0][:,:,0] |
|
p_profile[p_profile>0.02] = 0.02 |
|
|
|
|
|
u_profile_dash = np.copy(u_profile) |
|
v_profile_dash = np.copy(v_profile) |
|
|
|
|
|
u_profile_dash_1 = np.copy(u_profile) |
|
v_profile_dash_1 = np.copy(v_profile) |
|
|
|
|
|
|
|
for y, x in zero_pixel_locations: |
|
u_profile_dash[128 - y, x] = 0 |
|
v_profile_dash[128 - y, x] = 0 |
|
|
|
u_profile_dash_1[y, x] = 0 |
|
v_profile_dash_1[y, x] = 0 |
|
|
|
|
|
|
|
quiver_plot = plt.figure(figsize = (14,6), edgecolor = "gray") |
|
velocity = np.sqrt(u_profile_dash_1**2 + v_profile_dash_1**2) |
|
ax = plt.subplot() |
|
ax.imshow(velocity, cmap = 'gray', extent = (0,256, 0,128)) |
|
q = ax.quiver(X[5::7,5::7], Y[5::7,5::7], u_profile_dash[5::7,5::7], v_profile_dash[5::7,5::7], pivot = 'middle', color = 'red') |
|
ax.quiverkey(q, X=0.9, Y=1.07, U=2, |
|
label='m/s', labelpos='E') |
|
plt.title("Velocity distribution", fontsize = 11) |
|
plt.xlabel("Length of Channel", fontsize = 11) |
|
plt.ylabel("Height of Channel", fontsize = 11) |
|
|
|
|
|
streamline_plot = plt.figure(figsize = (14,6), edgecolor = "gray") |
|
plt.streamplot(X, Y, u_profile_dash, v_profile_dash, density = 4) |
|
plt.axis('scaled') |
|
plt.title("Streamline Plot", fontsize = 11) |
|
plt.xlabel("Length of Channel", fontsize = 11) |
|
plt.ylabel("Height of Channel", fontsize = 11) |
|
|
|
|
|
u_colored = colorize(u_profile, cmap = 'jet') |
|
|
|
v_colored = colorize(v_profile, cmap = 'jet') |
|
|
|
p_colored = colorize(p_profile, cmap = 'jet') |
|
|
|
|
|
|
|
return colorize(req_img, cmap = 'jet'), quiver_plot, streamline_plot, u_colored, v_colored, p_colored |
|
|
|
|
|
|
|
with gr.Blocks(theme="Taithrah/Minimal") as demo: |
|
gr.Markdown( |
|
""" |
|
# Channel Flow - Physics Constrained DNN for Predicting Mean Turbulent Flows |
|
The App solves 2-D incompressible steady state NS equations for any given 2-D closed geometry. Geometry needs to be drawn around the center of the patch.\n |
|
It predicts the streamlines,horizontal & vertical velocity profiles and the pressure profiles using a hybrid loss function.\n |
|
Model Parameters (In SI Units) - Kinematic Viscosity = 0.0001, Input horizontal velocity = 0.075, Input vertical velocity = 0 |
|
""") |
|
with gr.Row(): |
|
with gr.Column(): |
|
input_sketch = gr.Image(label = "Draw any Obstacle contour around the patch center", |
|
tool="sketch", source="canvas", shape=(h, w), brush_radius = 3) |
|
Process_button = gr.Button("Process Flow Parameters") |
|
|
|
with gr.Column(): |
|
filled_channel = gr.Image(label = "Drawn object within fluid domain of dimensions 128*256", container = True) |
|
|
|
with gr.Row(): |
|
quiver_plot = gr.Plot(label = "Velocity Distribution Around The Obstacle", scale = 2) |
|
|
|
with gr.Row(): |
|
streamline_plot = gr.Plot(label = "Stream Lines Around The Obstacle", scale = 2) |
|
|
|
with gr.Row(): |
|
u_image = gr.Image(label = "Horizontal Velocity") |
|
v_image = gr.Image(label = "Vertical Velocity") |
|
p_image = gr.Image(label = "Pressure") |
|
|
|
|
|
Process_button.click(fn=fill_shape_with_pixels, inputs=input_sketch, outputs=[filled_channel, quiver_plot, streamline_plot, u_image, v_image, p_image]) |
|
|
|
demo.launch(debug=True, inline = False) |