Spaces:
Runtime error
Runtime error
flow_field = np.ones((128,256), dtype = np.uint8) | |
# Changing the left input side | |
flow_field[:,0] = 3 | |
# Changing the right output side | |
flow_field[:,-1] = 4 | |
# Changing the top layer | |
flow_field[0,:] = 2 | |
# Changing the bottom layer | |
flow_field[-1,:] = 2 | |
def nvs_loss(y_pred, rho=10, nu=0.0001): #arbitary rho and nu(Later use values of air) | |
u,v,p = tf.split(y_pred, 3, axis=3) | |
#First order derivative | |
du_dx, du_dy = tf.image.image_gradients(u) # tf.image.image_gradients returns a tuple containing two tensors: u-grad along the x dir and u-grad along the y dir | |
dv_dx, dv_dy = tf.image.image_gradients(v) | |
dp_dx, dp_dy = tf.image.image_gradients(p) | |
#Second order derivatives | |
du_dx2, du_dydx = tf.image.image_gradients(du_dx) # du_dydx will be unused | |
du_dxdy, du_dy2 = tf.image.image_gradients(du_dy) # du_dxdy will be unused | |
dv_dx2, dv_dydx = tf.image.image_gradients(dv_dx) | |
dv_dxdy, dv_dy2 = tf.image.image_gradients(dv_dy) | |
#Momentum equation | |
er1_tensor = tf.math.multiply(u, du_dx) + tf.math.multiply(v, du_dy) + 1.0*dp_dx/rho - nu*(du_dx2 + du_dy2) | |
er2_tensor = tf.math.multiply(u, dv_dx) + tf.math.multiply(v, dv_dy) + 1.0*dp_dy/rho - nu*(dv_dx2 + dv_dy2) | |
# # #Continuity equation | |
er3_tensor = du_dx + dv_dy | |
er1 = tf.reduce_mean(er1_tensor) | |
er2 = tf.reduce_mean(er2_tensor) | |
er3 = tf.reduce_mean(er3_tensor) | |
return er1*er1 + er2*er2 + er3*er3 | |
# Initiating the Loss Function- | |
def custom_loss(y_true, y_pred): | |
nv_loss = nvs_loss(y_pred) | |
mse_loss = tf.reduce_mean(tf.square(y_true-y_pred)) # Try mse loss function here | |
return mse_loss + nv_loss | |
import torch | |
import matplotlib | |
def colorize(value, vmin=None, vmax=None, cmap='gray_r', invalid_val=-99, invalid_mask=None, background_color=(128, 128, 128, 255), gamma_corrected=False, value_transform=None): | |
"""Converts a depth map to a color image. | |
Args: | |
value (torch.Tensor, numpy.ndarry): Input depth map. Shape: (H, W) or (1, H, W) or (1, 1, H, W). All singular dimensions are squeezed | |
vmin (float, optional): vmin-valued entries are mapped to start color of cmap. If None, value.min() is used. Defaults to None. | |
vmax (float, optional): vmax-valued entries are mapped to end color of cmap. If None, value.max() is used. Defaults to None. | |
cmap (str, optional): matplotlib colormap to use. Defaults to 'magma_r'. | |
invalid_val (int, optional): Specifies value of invalid pixels that should be colored as 'background_color'. Defaults to -99. | |
invalid_mask (numpy.ndarray, optional): Boolean mask for invalid regions. Defaults to None. | |
background_color (tuple[int], optional): 4-tuple RGB color to give to invalid pixels. Defaults to (128, 128, 128, 255). | |
gamma_corrected (bool, optional): Apply gamma correction to colored image. Defaults to False. | |
value_transform (Callable, optional): Apply transform function to valid pixels before coloring. Defaults to None. | |
Returns: | |
numpy.ndarray, dtype - uint8: Colored depth map. Shape: (H, W, 4) | |
""" | |
if isinstance(value, torch.Tensor): | |
value = value.detach().cpu().numpy() | |
value = value.squeeze() | |
if invalid_mask is None: | |
invalid_mask = value == invalid_val | |
mask = np.logical_not(invalid_mask) | |
# normalize | |
# vmin = np.percentile(value[mask],2) if vmin is None else vmin | |
# vmax = np.percentile(value[mask],85) if vmax is None else vmax | |
vmin = np.min(value[mask]) if vmin is None else vmin | |
vmax = np.max(value[mask]) if vmax is None else vmax | |
if vmin != vmax: | |
value = (value - vmin) / (vmax - vmin) # vmin..vmax | |
else: | |
# Avoid 0-division | |
value = value * 0. | |
# squeeze last dim if it exists | |
# grey out the invalid values | |
value[invalid_mask] = np.nan | |
cmapper = matplotlib.cm.get_cmap(cmap) | |
if value_transform: | |
value = value_transform(value) | |
# value = value / value.max() | |
value = cmapper(value, bytes=True) # (nxmx4) | |
# img = value[:, :, :] | |
img = value[...] | |
img[invalid_mask] = background_color | |
# return img.transpose((2, 0, 1)) | |
if gamma_corrected: | |
# gamma correction | |
img = img / 255 | |
img = np.power(img, 2.2) | |
img = img * 255 | |
img = img.astype(np.uint8) | |
return img | |
def img_preprocess(image, h, w): | |
# Convert the drawn image to grayscale | |
img_gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY) | |
# Threshold the grayscale image to create a binary image | |
_, binary_img = cv2.threshold(img_gray, 1, 255, cv2.THRESH_BINARY) | |
# Perform flood fill starting from a point inside the shape. Fill the inside with pixel value 0 | |
seed_point = (int(h/2), int(w/2)) | |
retval, flooded_image, mask, rect = cv2.floodFill(binary_img, None, seed_point, 0) | |
flooded_image = (flooded_image/255).astype(np.uint8) | |
return flooded_image | |
def patch_stiching(flooded_image, h, w, x0, y0): # ((x0, y0) = center of channel, (w1, h1) = height and width of patch) | |
flow_field_updated = np.copy(flow_field) | |
print('flow field updated - ', flow_field_updated[:,-1]) | |
flow_field_updated[int(x0-w/2):int(x0+w/2),int(y0-h/2):int(y0+h/2)] = flooded_image | |
# flow_field_updated is the main thing that we will use to make our predictions on - | |
test_img = np.expand_dims(flow_field_updated, axis = 0) | |
test_img = np.expand_dims(test_img, axis = 3) # Shape of test_img = (1, 128, 256) | |
return test_img | |
# Define grid points | |
x_points = np.linspace(0, 255, 256) | |
y_points = np.linspace(0, 127, 128) | |
X, Y = np.meshgrid(x_points, y_points) | |
def return_quiver_plot(u, v): | |
velocity = np.sqrt(u**2 + v**2) | |
ax = plt.subplot() | |
ax.imshow(velocity, origin = 'lower', extent = (0,256, 0,128), cmap = 'gray') | |
q = ax.quiver(X[5::8,5::8], Y[5::8,5::8], u[5::8,5::8], u[5::8,5::8], pivot = 'middle', color = 'red') | |
# ax.quiverkey(q, X=0.9, Y=1.05, U=2, | |
# label='m/s', labelpos='E') | |
# plt.title("Velocity distribution") | |
# plt.show() | |
return q | |
def squeeze_function(img): | |
img = np.squeeze(img, axis = 0) | |
img = np.squeeze(img, axis = 2) | |
return img | |
# Taking a shape from the user on sketchpad and placing it inside the fluid flow - | |
h, w = 48, 48 # patch_size in which the obstacle will be drawn | |
x0, y0 = 64, 128 # (x0, y0) = center of channel | |
def fill_shape_with_pixels(img): #img is taken by gradio as uint8 | |
if img is None: | |
return np.zeros((h, w), dtype=np.uint8) # "No input sketch" | |
# Calling the the flooded image function to fill inside the obstacle | |
flooded_image = img_preprocess(img, h, w) | |
# Performing patch statching to put the obstacle at the required center position | |
test_img = patch_stiching(flooded_image, h, w, x0, y0) | |
# Loading and Compiling the Model | |
model_path = "/content/drive/MyDrive/Pinns_Loss_file.h5" | |
model = load_model(model_path, compile = False) | |
model.compile(loss=custom_loss, optimizer=tf.keras.optimizers.AdamW(learning_rate = 0.0001), metrics=['mae', 'cosine_proximity']) | |
# Making Model prediction from input sketch shape | |
prediction = model.predict(test_img) # (prediction.shape = (1, 128, 256, 3)) | |
u_pred, v_pred, p_pred = np.split(prediction, 3, axis=3) # shape of u_pred, v_pred, p_pred = (1, 128, 256, 1) | |
# Making test_img in shape required by zero_pixel_location | |
req_img = squeeze_function(test_img) | |
# Storing the location of 0 pixel values | |
#req_img = req_img.astype(int) | |
zero_pixel_locations = np.argwhere(req_img == 0) | |
# Reducing the dimensions- | |
u_profile = u_pred[0][:,:,0] # shape of u profile to compatible shape (H, W) = (128, 256) | |
v_profile = v_pred[0][:,:,0] | |
p_profile = p_pred[0][:,:,0] | |
p_profile[p_profile>1.6] = 1.6 | |
# Creating a copy of the above profiles- | |
u_profile_dash = np.copy(u_profile) | |
v_profile_dash = np.copy(v_profile) | |
# Creating a copy of the above profiles- | |
u_profile_dash_1 = np.copy(u_profile) | |
v_profile_dash_1 = np.copy(v_profile) | |
# Hollowing the obstacle out from the u and v plots. Origin of imae is lop left and origin of plot is top right | |
for y, x in zero_pixel_locations: | |
u_profile_dash[128 - y, x] = 0 | |
v_profile_dash[128 - y, x] = 0 | |
# will be used for image | |
u_profile_dash_1[y, x] = 0 | |
v_profile_dash_1[y, x] = 0 | |
# Quiver Plot | |
quiver_plot = plt.figure(figsize = (14,6), edgecolor = "gray") | |
velocity = np.sqrt(u_profile_dash_1**2 + v_profile_dash_1**2) | |
ax = plt.subplot() | |
ax.imshow(velocity, cmap = 'gray', extent = (0,256, 0,128)) | |
q = ax.quiver(X[5::7,5::7], Y[5::7,5::7], u_profile_dash[5::7,5::7], v_profile_dash[5::7,5::7], pivot = 'middle', color = 'red') | |
ax.quiverkey(q, X=0.9, Y=1.07, U=2, | |
label='m/s', labelpos='E') | |
plt.title("Velocity distribution", fontsize = 11) | |
plt.xlabel("Length of Channel", fontsize = 11) | |
plt.ylabel("Height of Channel", fontsize = 11) | |
# StreamLine Plot | |
streamline_plot = plt.figure(figsize = (14,6), edgecolor = "gray") | |
plt.streamplot(X, Y, u_profile_dash, v_profile_dash, density = 3.5) | |
plt.axis('scaled') | |
plt.title("Streamline Plot", fontsize = 11) | |
plt.xlabel("Length of Channel", fontsize = 11) | |
plt.ylabel("Height of Channel", fontsize = 11) | |
# Colorize taken from ZoeDepth Model | |
u_colored = colorize(u_profile, cmap = 'jet') | |
#cbar_u = plt.colorbar(u_profile,fraction=0.025, pad=0.05) | |
v_colored = colorize(v_profile, cmap = 'jet') | |
#cbar_v = plt.colorbar(v_colored,fraction=0.025, pad=0.05) | |
p_colored = colorize(p_profile, cmap = 'jet') | |
#cbar_p = plt.colorbar(p_colored,fraction=0.025, pad=0.05) | |
return colorize(req_img, cmap = 'jet'), quiver_plot, streamline_plot, u_colored, v_colored, p_colored | |
# Importing gr.Blocks() | |
with gr.Blocks(theme="Taithrah/Minimal") as demo: | |
gr.Markdown( | |
""" | |
# Physics Constrained DNN for Predicting Mean Turbulent Flows | |
The App solves 2-D incompressible steady state NS equations for any given 2-D closed geometry. Geometry needs to be drawn around the center of the patch.\n | |
It predicts the streamlines,horizontal & vertical velocity profiles and the pressure profiles using a hybrid loss function.\n | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
input_sketch = gr.Image(label = "Draw any Obstacle contour around the patch center", | |
tool="sketch", source="canvas", shape=(h, w), brush_radius = 3) | |
Process_button = gr.Button("Process Flow Parameters") | |
with gr.Column(): | |
filled_channel = gr.Image(label = "Drawn object inside a Channel of dimensions 128*256", container = True) | |
with gr.Row(): | |
quiver_plot = gr.Plot(label = "Velocity Distribution Around The Obstacle", scale = 2) | |
with gr.Row(): | |
streamline_plot = gr.Plot(label = "Stream Lines Around The Obstacle", scale = 2) | |
with gr.Row(): | |
u_image = gr.Image(label = "Horizontal Velocity") | |
v_image = gr.Image(label = "Vertical Velocity") | |
p_image = gr.Image(label = "Pressure") | |
Process_button.click(fn=fill_shape_with_pixels, inputs=input_sketch, outputs=[filled_channel, quiver_plot, streamline_plot, u_image, v_image, p_image]) | |
demo.launch(debug=True, server_name = "0.0.0.0", share = True, inline = False) |