#!/usr/bin/env python # coding: utf-8 # In[2]: import time import torch import warnings import numpy as np import gradio as gr import matplotlib.pyplot as plt # Import Burgers' equation components from data_burgers import exact_solution as exact_solution_burgers from model_io_burgers import load_model from model_v2 import Encoder, Decoder, Propagator_concat as Propagator, Model from LSTM_model import AE_Encoder, AE_Decoder, AE_Model, PytorchLSTM # Import Advection-Diffusion components from data_adv_dif import exact_solution as exact_solution_adv_dif from model_io_adv_dif import load_model as load_model_adv_dif from model_adv_dif import Encoder as Encoder2D, Decoder as Decoder2D, Propagator_concat as Propagator2D, Model as Model2D warnings.filterwarnings("ignore") # ========== Burgers' Equation Setup ========== def get_burgers_model(input_dim, latent_dim): encoder = Encoder(input_dim, latent_dim) decoder = Decoder(latent_dim, input_dim) propagator = Propagator(latent_dim) return Model(encoder, decoder, propagator) flexi_prop_model = get_burgers_model(128, 2) checkpoint = torch.load("./FlexiPropagator_2025-02-01-10-28-34_3e9656b5_best.pt", map_location='cpu') flexi_prop_model.load_state_dict(checkpoint['model_state_dict']) flexi_prop_model.eval() # AE LSTM models ae_encoder = AE_Encoder(128) ae_decoder = AE_Decoder(2, 128) ae_model = AE_Model(ae_encoder, ae_decoder) lstm_model = PytorchLSTM() ae_encoder.load_state_dict(torch.load("./ae_encoder_weights.pth", map_location='cpu')) ae_decoder.load_state_dict(torch.load("./ae_decoder_weights.pth", map_location='cpu')) ae_model.load_state_dict(torch.load("./ae_model.pth", map_location='cpu')) lstm_model.load_state_dict(torch.load("./lstm_weights.pth", map_location='cpu')) # ========== Helper Functions Burgers ========== def exacts_equals_timewindow(t_0, Re, time_window=40): dt = 2 / 500 solutions = [exact_solution_burgers(Re, t) for t in (t_0 + np.arange(0, time_window) * dt)] solns = torch.tensor(solutions, dtype=torch.float32)[None, :, :] latents = ae_encoder(solns) re_normalized = Re / 1000 re_repeated = torch.ones(1, time_window, 1) * re_normalized return torch.cat((latents, re_repeated), dim=2), latents, solns # Precompute contour plots z1_vals = np.linspace(-10, 0.5, 200) z2_vals = np.linspace(5, 32, 200) Z1, Z2 = np.meshgrid(z1_vals, z2_vals) latent_grid = np.stack([Z1.ravel(), Z2.ravel()], axis=1) # Convert to tensor for decoding latent_tensors = torch.tensor(latent_grid, dtype=torch.float32) # Decode latent vectors and compute properties with torch.no_grad(): decoded_signals = flexi_prop_model.decoder(latent_tensors) sharpness = [] peak_positions = [] x_vals = np.linspace(0, 2, decoded_signals.shape[1]) dx = x_vals[1] - x_vals[0] for signal in decoded_signals.numpy(): grad_u = np.gradient(signal, dx) sharpness.append(np.max(np.abs(grad_u))) peak_positions.append(x_vals[np.argmax(signal)]) sharpness = np.array(sharpness).reshape(Z1.shape) peak_positions = np.array(peak_positions).reshape(Z1.shape) def plot_burgers_comparison(Re, tau, t_0): dt = 2.0 / 500.0 t_final = t_0 + tau * dt x_exact = exact_solution_burgers(Re, t_final) tau_tensor, Re_tensor, xt = torch.tensor([tau]).float()[:, None], torch.tensor([Re]).float()[:, None], torch.tensor([exact_solution_burgers(Re, t_0)]).float()[:, None] with torch.no_grad(): _, x_hat_tau, *_ = flexi_prop_model(xt, tau_tensor, Re_tensor) latent_for_lstm, *_ = exacts_equals_timewindow(t_0, Re) with torch.no_grad(): for _ in range(40, tau): pred = lstm_model(latent_for_lstm) pred_with_re = torch.cat((pred, torch.tensor([[Re / 1000]], dtype=torch.float32)), dim=1) latent_for_lstm = torch.cat((latent_for_lstm[:, 1:, :], pred_with_re.unsqueeze(0)), dim=1) final_pred_high_dim = ae_decoder(pred.unsqueeze(0)) fig, ax = plt.subplots(figsize=(9, 5)) ax.plot(xt.squeeze(), '--', linewidth=3, alpha=0.5, color="C0") ax.plot(x_hat_tau.squeeze(), 'D', markersize=5, color="C2") ax.plot(final_pred_high_dim.squeeze().detach().numpy(), '^', markersize=5, color="C1") ax.plot(x_exact.squeeze(), linewidth=2, alpha=0.5, color="Black") ax.set_title(f"Comparison ($t_0$={t_0:.2f} → $t_f$={t_final:.2f}), τ={tau}", fontsize=14) ax.legend(["Initial", "Flexi-Prop", "AE LSTM", "True"]) return fig def burgers_update(Re, tau, t0): fig1 = plot_burgers_comparison(Re, tau, t0) # Timing calculations start = time.time() _ = flexi_prop_model(torch.randn(1, 1, 128), torch.tensor([[tau]]), torch.tensor([[Re]])) flexi_time = time.time() - start start = time.time() latent_for_lstm, _, _ = exacts_equals_timewindow(t0, Re, 40) encode_time = time.time() - start start = time.time() with torch.no_grad(): for _ in range(40, tau): pred = lstm_model(latent_for_lstm) pred_with_re = torch.cat((pred, torch.tensor([[Re / 1000]], dtype=torch.float32)), dim=1) latent_for_lstm = torch.cat((latent_for_lstm[:, 1:, :], pred_with_re.unsqueeze(0)), dim=1) recursion_time = time.time() - start start = time.time() final_pred_high_dim = ae_decoder(pred.unsqueeze(0)) decode_time = time.time() - start ae_lstm_total_time = encode_time + recursion_time + decode_time time_ratio = ae_lstm_total_time / flexi_time # Time plot fig, ax = plt.subplots(figsize=(11, 6)) ax.bar(["Flexi-Prop", "AE LSTM (Encode)", "AE LSTM (Recursion)", "AE LSTM (Decode)", "AE LSTM (Total)"], [flexi_time, encode_time, recursion_time, decode_time, ae_lstm_total_time], color=["C0", "C1", "C2", "C3", "C4"]) ax.set_ylabel("Time (s)", fontsize=14) ax.set_title("Computation Time Comparison", fontsize=14) ax.grid(alpha=0.3) # Latent space visualization latent_fig = plot_latent_interpretation(Re, tau, t0) return fig1, fig, time_ratio, latent_fig def plot_latent_interpretation(Re, tau, t_0): tau_tensor = torch.tensor([tau]).float()[:, None] Re_tensor = torch.tensor([Re]).float()[:, None] x_t = exact_solution_burgers(Re, t_0) xt = torch.tensor([x_t]).float()[:, None] with torch.no_grad(): _, _, _, _, z_tau = flexi_prop_model(xt, tau_tensor, Re_tensor) z_tau = z_tau.squeeze().numpy() fig, axes = plt.subplots(1, 2, figsize=(9, 3)) # Sharpness Plot c1 = axes[0].pcolormesh(Z1, Z2, sharpness, cmap='plasma', shading='gouraud') axes[0].scatter(z_tau[0], z_tau[1], color='red', marker='o', s=50, label="Current State") axes[0].set_ylabel("$Z_2$", fontsize=14) axes[0].set_title("Sharpness Encoding", fontsize=14) fig.colorbar(c1, ax=axes[0]) axes[0].legend() # Peak Position Plot c2 = axes[1].pcolormesh(Z1, Z2, peak_positions, cmap='viridis', shading='gouraud') axes[1].scatter(z_tau[0], z_tau[1], color='red', marker='o', s=50, label="Current State") axes[1].set_title("Peak position Encoding", fontsize=14) fig.colorbar(c2, ax=axes[1], label="Peak Position") # Remove redundant y-axis labels on the second plot for better aesthetics axes[1].set_yticklabels([]) # Set a single x-axis label centered below both plots fig.supxlabel("$Z_1$", fontsize=14) return fig # ========== Advection-Diffusion Setup ========== def get_adv_dif_model(latent_dim, output_dim): encoder = Encoder2D(latent_dim) decoder = Decoder2D(latent_dim) propagator = Propagator2D(latent_dim) return Model2D(encoder, decoder, propagator) adv_dif_model = get_adv_dif_model(3, 128) adv_dif_model, _, _, _ = load_model_adv_dif( "./FlexiPropagator_2D_2025-01-30-12-11-01_0aee8fb0_best.pt", adv_dif_model ) def generate_3d_visualization(Re, t_0, tau): dt = 2 / 500 t = t_0 + tau * dt U_initial = exact_solution_adv_dif(Re, t_0) U_evolved = exact_solution_adv_dif(Re, t) if np.isnan(U_initial).any() or np.isnan(U_evolved).any(): return None fig3d = plt.figure(figsize=(12, 5)) ax3d = fig3d.add_subplot(111, projection='3d') x_vals = np.linspace(-2, 2, U_initial.shape[1]) y_vals = np.linspace(-2, 2, U_initial.shape[0]) X, Y = np.meshgrid(x_vals, y_vals) surf1 = ax3d.plot_surface(X, Y, U_initial, cmap="viridis", alpha=0.6, label="Initial") surf2 = ax3d.plot_surface(X, Y, U_evolved, cmap="plasma", alpha=0.8, label="Evolved") ax3d.set_xlim(-3, 3) ax3d.set_xlabel("x") ax3d.set_ylabel("y") ax3d.set_zlabel("u(x,y,t)") ax3d.view_init(elev=25, azim=-45) ax3d.set_box_aspect((2,1,1)) fig3d.colorbar(surf1, ax=ax3d, shrink=0.5, label="Initial") fig3d.colorbar(surf2, ax=ax3d, shrink=0.5, label="Evolved") ax3d.set_title(f"Solution Evolution\nInitial ($t_0$={t_0:.2f}) vs Evolved ($t_f$={t:.2f})") plt.tight_layout() plt.close(fig3d) return fig3d def adv_dif_comparison(Re, t_0, tau): dt = 2 / 500 exact_initial = exact_solution_adv_dif(Re, t_0) exact_final = exact_solution_adv_dif(Re, t_0 + tau * dt) if np.isnan(exact_initial).any() or np.isnan(exact_final).any(): return None x_in = torch.tensor(exact_initial, dtype=torch.float32)[None, None, :, :] Re_in = torch.tensor([[Re]], dtype=torch.float32) tau_in = torch.tensor([[tau]], dtype=torch.float32) with torch.no_grad(): x_hat, x_hat_tau, *_ = adv_dif_model(x_in, tau_in, Re_in) pred = x_hat_tau.squeeze().numpy() if pred.shape != exact_final.shape: return None mse = np.square(pred - exact_final) fig, axs = plt.subplots(1, 3, figsize=(15, 4)) for ax, (data, title) in zip(axs, [(pred, "Model Prediction"), (exact_final, "Exact Solution"), (mse, "MSE Error")]): if title == "MSE Error": im = ax.imshow(data, cmap="viridis", vmin=0, vmax=1e-2) plt.colorbar(im, ax=ax, fraction=0.075) else: im = ax.imshow(data, cmap="jet") ax.set_title(title) ax.axis("off") plt.tight_layout() plt.close(fig) return fig def update_initial_plot(Re, t_0): exact_initial = exact_solution_adv_dif(Re, t_0) fig, ax = plt.subplots(figsize=(5, 5)) im = ax.imshow(exact_initial, cmap='jet') plt.colorbar(im, ax=ax) ax.set_title('Initial State') return fig # ========== Gradio Interface ========== with gr.Blocks(title="Flexi-Propagator: PDE Prediction Suite") as app: gr.Markdown("# Flexi-Propagator: Unified PDE Prediction Interface") with gr.Tabs(): # 1D Burgers' Equation Tab with gr.Tab("1D Burgers' Equation"): gr.Markdown(r""" ## 🚀 Flexi-Propagator: Single-Shot Prediction for Nonlinear PDEs **Governing Equation (1D Burgers' Equation):** $$ \frac{\partial u}{\partial t} + u \frac{\partial u}{\partial x} = \nu \frac{\partial^2 u}{\partial x^2} $$ **Key Advantages:** ✔️ **60-150× faster** than AE-LSTM baselines ✔️ **Parametric control**: Embeds system parameters in latent space **Physically Interpretable Latent Space - Disentanglement:**
$$ Z_1 \text{ Encodes Peak Location, } Z_2 \text{ Predominantly Encodes Re (Sharpness)} $$
""") with gr.Row(): with gr.Column(): re_burgers = gr.Slider(425, 2350, 1040, label="Reynolds Number") tau_burgers = gr.Slider(150, 450, 315, label="Time Steps (τ)") t0_burgers = gr.Number(0.4, label="Initial Time") latent_plot = gr.Plot(label="Latent Space Dynamics") with gr.Column(): burgers_plot = gr.Plot() time_plot = gr.Plot() ratio_out = gr.Number(label="Time Ratio (Flexi Prop/AE LSTM)") # with gr.Row(): # latent_plot = gr.Plot(label="Latent Space Dynamics") re_burgers.change(burgers_update, [re_burgers, tau_burgers, t0_burgers], [burgers_plot, time_plot, ratio_out, latent_plot]) tau_burgers.change(burgers_update, [re_burgers, tau_burgers, t0_burgers], [burgers_plot, time_plot, ratio_out, latent_plot]) t0_burgers.change(burgers_update, [re_burgers, tau_burgers, t0_burgers], [burgers_plot, time_plot, ratio_out, latent_plot]) # 2D Advection-Diffusion Tab with gr.Tab("2D Advection-Diffusion"): gr.Markdown(r""" ## 🌪️ 2D Advection-Diffusion Visualization **Governing Equation:** $$ \frac{\partial u}{\partial t} + c \frac{\partial u}{\partial x} = \nu \left( \frac{\partial^2 u}{\partial x^2} + \frac{\partial^2 u}{\partial y^2} \right) $$ """) with gr.Row(): with gr.Column(scale=1): re_adv = gr.Slider(1, 10, 9, label="Reynolds Number (Re)") t0_adv = gr.Number(0.45, label="Initial Time") tau_adv = gr.Slider(150, 425, 225, label="Tau (τ)") initial_plot_adv = gr.Plot(label="Initial State") with gr.Column(scale=3): with gr.Row(): three_d_plot_adv = gr.Plot(label="3D Evolution") with gr.Row(): comparison_plots_adv = gr.Plot(label="Model Comparison") def adv_update(Re, t0, tau): return ( generate_3d_visualization(Re, t0, tau), adv_dif_comparison(Re, t0, tau), update_initial_plot(Re, t0) ) for component in [re_adv, t0_adv, tau_adv]: component.change(adv_update, [re_adv, t0_adv, tau_adv], [three_d_plot_adv, comparison_plots_adv, initial_plot_adv]) app.load(lambda: adv_update(8, 0.35, 225), outputs=[three_d_plot_adv, comparison_plots_adv, initial_plot_adv]) app.launch() # In[ ]: