import pandas as pd import tensorflow as tf from tensorflow.keras.models import load_model import cv2 import numpy as np import matplotlib.pyplot as plt import gradio as gr # developing the flowfield space flow_field = np.ones((128,256), dtype = np.uint8) # Changing the left input side flow_field[:,0] = 3 # Changing the right output side flow_field[:,-1] = 4 # Changing the top layer flow_field[0,:] = 2 # Changing the bottom layer flow_field[-1,:] = 2 mean_u = 0.075003795 mean_v = -0.000036 mean_p = 0.004301 std_dev_u = 0.04605 std_dev_v = 0.013812 std_dev_p = 0.007917 def nvs_loss(y_pred, rho=10, nu=0.0001): #arbitary rho and nu(Later use values of air) u,v,p = tf.split(y_pred, 3, axis=3) #First order derivative du_dx, du_dy = tf.image.image_gradients(u) # tf.image.image_gradients returns a tuple containing two tensors: u-grad along the x dir and u-grad along the y dir dv_dx, dv_dy = tf.image.image_gradients(v) dp_dx, dp_dy = tf.image.image_gradients(p) #Second order derivatives du_dx2, du_dydx = tf.image.image_gradients(du_dx) # du_dydx will be unused du_dxdy, du_dy2 = tf.image.image_gradients(du_dy) # du_dxdy will be unused dv_dx2, dv_dydx = tf.image.image_gradients(dv_dx) dv_dxdy, dv_dy2 = tf.image.image_gradients(dv_dy) #Momentum equation er1_tensor = tf.math.multiply(u, du_dx) + tf.math.multiply(v, du_dy) + 1.0*dp_dx/rho - nu*(du_dx2 + du_dy2) er2_tensor = tf.math.multiply(u, dv_dx) + tf.math.multiply(v, dv_dy) + 1.0*dp_dy/rho - nu*(dv_dx2 + dv_dy2) # # #Continuity equation er3_tensor = du_dx + dv_dy er1 = tf.reduce_mean(er1_tensor) er2 = tf.reduce_mean(er2_tensor) er3 = tf.reduce_mean(er3_tensor) return er1*er1 + er2*er2 + er3*er3 # Initiating the Loss Function- def custom_loss(y_true, y_pred): nv_loss = nvs_loss(y_pred) mse_loss = tf.reduce_mean(tf.square(y_true-y_pred)) # Try mse loss function here return mse_loss + nv_loss import torch import matplotlib def colorize(value, vmin=None, vmax=None, cmap='gray_r', invalid_val=-99, invalid_mask=None, background_color=(128, 128, 128, 255), gamma_corrected=False, value_transform=None): """Converts a depth map to a color image. Args: value (torch.Tensor, numpy.ndarry): Input depth map. Shape: (H, W) or (1, H, W) or (1, 1, H, W). All singular dimensions are squeezed vmin (float, optional): vmin-valued entries are mapped to start color of cmap. If None, value.min() is used. Defaults to None. vmax (float, optional): vmax-valued entries are mapped to end color of cmap. If None, value.max() is used. Defaults to None. cmap (str, optional): matplotlib colormap to use. Defaults to 'magma_r'. invalid_val (int, optional): Specifies value of invalid pixels that should be colored as 'background_color'. Defaults to -99. invalid_mask (numpy.ndarray, optional): Boolean mask for invalid regions. Defaults to None. background_color (tuple[int], optional): 4-tuple RGB color to give to invalid pixels. Defaults to (128, 128, 128, 255). gamma_corrected (bool, optional): Apply gamma correction to colored image. Defaults to False. value_transform (Callable, optional): Apply transform function to valid pixels before coloring. Defaults to None. Returns: numpy.ndarray, dtype - uint8: Colored depth map. Shape: (H, W, 4) """ if isinstance(value, torch.Tensor): value = value.detach().cpu().numpy() value = value.squeeze() if invalid_mask is None: invalid_mask = value == invalid_val mask = np.logical_not(invalid_mask) # normalize # vmin = np.percentile(value[mask],2) if vmin is None else vmin # vmax = np.percentile(value[mask],85) if vmax is None else vmax vmin = np.min(value[mask]) if vmin is None else vmin vmax = np.max(value[mask]) if vmax is None else vmax if vmin != vmax: value = (value - vmin) / (vmax - vmin) # vmin..vmax else: # Avoid 0-division value = value * 0. # squeeze last dim if it exists # grey out the invalid values value[invalid_mask] = np.nan cmapper = matplotlib.cm.get_cmap(cmap) if value_transform: value = value_transform(value) # value = value / value.max() value = cmapper(value, bytes=True) # (nxmx4) # img = value[:, :, :] img = value[...] img[invalid_mask] = background_color # return img.transpose((2, 0, 1)) if gamma_corrected: # gamma correction img = img / 255 img = np.power(img, 2.2) img = img * 255 img = img.astype(np.uint8) return img def img_preprocess(image, h, w): # Convert the drawn image to grayscale img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) # Threshold the grayscale image to create a binary image _, binary_img = cv2.threshold(img_gray, 1, 255, cv2.THRESH_BINARY) # Perform flood fill starting from a point inside the shape. Fill the inside with pixel value 0 seed_point = (int(h/2), int(w/2)) retval, flooded_image, mask, rect = cv2.floodFill(binary_img, None, seed_point, 0) flooded_image = (flooded_image/255).astype(np.uint8) return flooded_image def patch_stiching(flooded_image, h, w, x0, y0): # ((x0, y0) = center of channel, (w1, h1) = height and width of patch) flow_field_updated = np.copy(flow_field) flow_field_updated[int(x0-w/2):int(x0+w/2),int(y0-h/2):int(y0+h/2)] = flooded_image # flow_field_updated is the main thing that we will use to make our predictions on - test_img = np.expand_dims(flow_field_updated, axis = 0) test_img = np.expand_dims(test_img, axis = 3) # Shape of test_img = (1, 128, 256) return test_img # Define grid points x_points = np.linspace(0, 255, 256) y_points = np.linspace(0, 127, 128) X, Y = np.meshgrid(x_points, y_points) def return_quiver_plot(u, v): velocity = np.sqrt(u**2 + v**2) ax = plt.subplot() ax.imshow(velocity, origin = 'lower', extent = (0,256, 0,128), cmap = 'gray') q = ax.quiver(X[5::8,5::8], Y[5::8,5::8], u[5::8,5::8], u[5::8,5::8], pivot = 'middle', color = 'red') # ax.quiverkey(q, X=0.9, Y=1.05, U=2, # label='m/s', labelpos='E') # plt.title("Velocity distribution") # plt.show() return q def squeeze_function(img): img = np.squeeze(img, axis = 0) img = np.squeeze(img, axis = 2) return img # Taking a shape from the user on sketchpad and placing it inside the fluid flow - h, w = 48, 48 # patch_size in which the obstacle will be drawn x0, y0 = 64, 128 # (x0, y0) = center of channel def fill_shape_with_pixels(img): #img is taken by gradio as uint8 if img is None: return np.zeros((h, w), dtype=np.uint8) # "No input sketch" # Calling the the flooded image function to fill inside the obstacle flooded_image = img_preprocess(img, h, w) # Performing patch statching to put the obstacle at the required center position test_img = patch_stiching(flooded_image, h, w, x0, y0) # Loading and Compiling the Model model_path = "Pinns_Loss_file.h5" model = load_model(model_path, compile = False) model.compile(loss=custom_loss, optimizer=tf.keras.optimizers.AdamW(learning_rate = 0.0001), metrics=['mae', 'cosine_proximity']) # Making Model prediction from input sketch shape prediction = model.predict(test_img) # (prediction.shape = (1, 128, 256, 3)) u_pred, v_pred, p_pred = np.split(prediction, 3, axis=3) # shape of u_pred, v_pred, p_pred = (1, 128, 256, 1) # De-Normalizing teh Data: u_pred = ((u_pred*std_dev_u) + mean_u) v_pred = ((v_pred*std_dev_v) + mean_v) p_pred = ((p_pred*std_dev_p) + mean_p) # Making test_img in shape required by zero_pixel_location req_img = squeeze_function(test_img) # Storing the location of 0 pixel values #req_img = req_img.astype(int) zero_pixel_locations = np.argwhere(req_img == 0) # Reducing the dimensions- u_profile = u_pred[0][:,:,0] # shape of u profile to compatible shape (H, W) = (128, 256) v_profile = v_pred[0][:,:,0] p_profile = p_pred[0][:,:,0] p_profile[p_profile>0.02] = 0.02 # Creating a copy of the above profiles- u_profile_dash = np.copy(u_profile) v_profile_dash = np.copy(v_profile) # Creating a copy of the above profiles- u_profile_dash_1 = np.copy(u_profile) v_profile_dash_1 = np.copy(v_profile) # Hollowing the obstacle out from the u and v plots. Origin of imae is lop left and origin of plot is top right for y, x in zero_pixel_locations: u_profile_dash[128 - y, x] = 0 v_profile_dash[128 - y, x] = 0 # will be used for image u_profile_dash_1[y, x] = 0 v_profile_dash_1[y, x] = 0 # Quiver Plot quiver_plot = plt.figure(figsize = (14,6), edgecolor = "gray") velocity = np.sqrt(u_profile_dash_1**2 + v_profile_dash_1**2) ax = plt.subplot() ax.imshow(velocity, cmap = 'gray', extent = (0,256, 0,128)) q = ax.quiver(X[5::7,5::7], Y[5::7,5::7], u_profile_dash[5::7,5::7], v_profile_dash[5::7,5::7], pivot = 'middle', color = 'red') ax.quiverkey(q, X=0.9, Y=1.07, U=2, label='m/s', labelpos='E') plt.title("Velocity distribution", fontsize = 11) plt.xlabel("Length of Channel", fontsize = 11) plt.ylabel("Height of Channel", fontsize = 11) # StreamLine Plot streamline_plot = plt.figure(figsize = (14,6), edgecolor = "gray") plt.streamplot(X, Y, u_profile_dash, v_profile_dash, density = 4) plt.axis('scaled') plt.title("Streamline Plot", fontsize = 11) plt.xlabel("Length of Channel", fontsize = 11) plt.ylabel("Height of Channel", fontsize = 11) # Colorize taken from ZoeDepth Model u_colored = colorize(u_profile, cmap = 'jet') #cbar_u = plt.colorbar(u_profile,fraction=0.025, pad=0.05) v_colored = colorize(v_profile, cmap = 'jet') #cbar_v = plt.colorbar(v_colored,fraction=0.025, pad=0.05) p_colored = colorize(p_profile, cmap = 'jet') #cbar_p = plt.colorbar(p_colored,fraction=0.025, pad=0.05) return colorize(req_img, cmap = 'jet'), quiver_plot, streamline_plot, u_colored, v_colored, p_colored # Importing gr.Blocks() with gr.Blocks(theme="Taithrah/Minimal") as demo: gr.Markdown( """ # Channel Flow - Physics Constrained DNN for Predicting Mean Turbulent Flows The App solves 2-D incompressible steady state NS equations for any given 2-D closed geometry. Geometry needs to be drawn around the center of the patch.\n It predicts the streamlines,horizontal & vertical velocity profiles and the pressure profiles using a hybrid loss function.\n Model Parameters (In SI Units) - Kinematic Viscosity = 0.0001, Input horizontal velocity = 0.075, Input vertical velocity = 0 """) with gr.Row(): with gr.Column(): input_sketch = gr.Image(label = "Draw any Obstacle contour around the patch center", tool="sketch", source="canvas", shape=(h, w), brush_radius = 3) Process_button = gr.Button("Process Flow Parameters") with gr.Column(): filled_channel = gr.Image(label = "Drawn object within fluid domain of dimensions 128*256", container = True) with gr.Row(): quiver_plot = gr.Plot(label = "Velocity Distribution Around The Obstacle", scale = 2) with gr.Row(): streamline_plot = gr.Plot(label = "Stream Lines Around The Obstacle", scale = 2) with gr.Row(): u_image = gr.Image(label = "Horizontal Velocity") v_image = gr.Image(label = "Vertical Velocity") p_image = gr.Image(label = "Pressure") Process_button.click(fn=fill_shape_with_pixels, inputs=input_sketch, outputs=[filled_channel, quiver_plot, streamline_plot, u_image, v_image, p_image]) demo.launch(debug=True, inline = False)