Spaces:
Sleeping
Sleeping
| """ | |
| This example solves a 2D Navier-Stokes flow problem | |
| Du/dt -nu D(u) + grad p = 0 | |
| Div u = 0 | |
| with (hard) velocity-Dirichlet boundary conditions | |
| and using semi-Lagrangian advection | |
| """ | |
| import argparse | |
| import warp as wp | |
| import warp.fem as fem | |
| import numpy as np | |
| from warp.fem.utils import array_axpy | |
| from warp.sparse import bsr_mm, bsr_mv, bsr_copy | |
| try: | |
| from .bsr_utils import bsr_to_scipy | |
| from .plot_utils import Plot | |
| from .mesh_utils import gen_trimesh | |
| except ImportError: | |
| from bsr_utils import bsr_to_scipy | |
| from plot_utils import Plot | |
| from mesh_utils import gen_trimesh | |
| # need to solve a saddle-point system, use scopy for simplicity | |
| from scipy.sparse import bmat | |
| from scipy.sparse.linalg import factorized | |
| import matplotlib.pyplot as plt | |
| import matplotlib.animation as animation | |
| def u_boundary_value(s: fem.Sample, domain: fem.Domain, v: fem.Field, top_vel: float): | |
| # Horizontal velocity on top of domain, zero elsewhere | |
| if domain(s)[1] == 1.0: | |
| return wp.dot(wp.vec2f(top_vel, 0.0), v(s)) | |
| return wp.dot(wp.vec2f(0.0, 0.0), v(s)) | |
| def mass_form( | |
| s: fem.Sample, | |
| u: fem.Field, | |
| v: fem.Field, | |
| ): | |
| return wp.dot(u(s), v(s)) | |
| def inertia_form(s: fem.Sample, u: fem.Field, v: fem.Field, dt: float): | |
| return mass_form(s, u, v) / dt | |
| def viscosity_form(s: fem.Sample, u: fem.Field, v: fem.Field, nu: float): | |
| return 2.0 * nu * wp.ddot(fem.D(u, s), fem.D(v, s)) | |
| def viscosity_and_inertia_form(s: fem.Sample, u: fem.Field, v: fem.Field, dt: float, nu: float): | |
| return inertia_form(s, u, v, dt) + viscosity_form(s, u, v, nu) | |
| def transported_inertia_form(s: fem.Sample, domain: fem.Domain, u: fem.Field, v: fem.Field, dt: float): | |
| pos = domain(s) | |
| vel = u(s) | |
| conv_pos = pos - 0.5 * vel * dt | |
| conv_s = fem.lookup(domain, conv_pos, s) | |
| conv_vel = u(conv_s) | |
| conv_pos = conv_pos - 0.5 * conv_vel * dt | |
| conv_vel = u(fem.lookup(domain, conv_pos, conv_s)) | |
| return wp.dot(conv_vel, v(s)) / dt | |
| def div_form( | |
| s: fem.Sample, | |
| u: fem.Field, | |
| q: fem.Field, | |
| ): | |
| return -q(s) * fem.div(u, s) | |
| class Example: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--resolution", type=int, default=25) | |
| parser.add_argument("--degree", type=int, default=2) | |
| parser.add_argument("--num_frames", type=int, default=1000) | |
| parser.add_argument("--top_velocity", type=float, default=1.0) | |
| parser.add_argument("--Re", type=float, default=1000.0) | |
| parser.add_argument("--tri_mesh", action="store_true", help="Use a triangular mesh") | |
| def __init__(self, stage=None, quiet=False, args=None, **kwargs): | |
| if args is None: | |
| # Read args from kwargs, add default arg values from parser | |
| args = argparse.Namespace(**kwargs) | |
| args = Example.parser.parse_args(args=[], namespace=args) | |
| self._args = args | |
| self._quiet = quiet | |
| res = args.resolution | |
| self.sim_dt = 1.0 / args.resolution | |
| self.current_frame = 0 | |
| viscosity = args.top_velocity / args.Re | |
| if args.tri_mesh: | |
| positions, tri_vidx = gen_trimesh(res=wp.vec2i(res)) | |
| geo = fem.Trimesh2D(tri_vertex_indices=tri_vidx, positions=positions) | |
| else: | |
| geo = fem.Grid2D(res=wp.vec2i(res)) | |
| domain = fem.Cells(geometry=geo) | |
| boundary = fem.BoundarySides(geo) | |
| # Functions spaces: Q(d)-Q(d-1) | |
| u_degree = args.degree | |
| u_space = fem.make_polynomial_space(geo, degree=u_degree, dtype=wp.vec2) | |
| p_space = fem.make_polynomial_space(geo, degree=u_degree - 1) | |
| # Viscosity and inertia | |
| u_test = fem.make_test(space=u_space, domain=domain) | |
| u_trial = fem.make_trial(space=u_space, domain=domain) | |
| u_matrix = fem.integrate( | |
| viscosity_and_inertia_form, | |
| fields={"u": u_trial, "v": u_test}, | |
| values={"nu": viscosity, "dt": self.sim_dt}, | |
| ) | |
| # Pressure-velocity coupling | |
| p_test = fem.make_test(space=p_space, domain=domain) | |
| div_matrix = fem.integrate(div_form, fields={"u": u_trial, "q": p_test}) | |
| # Enforcing the Dirichlet boundary condition the hard way; | |
| # build projector for velocity left- and right-hand-sides | |
| u_bd_test = fem.make_test(space=u_space, domain=boundary) | |
| u_bd_trial = fem.make_trial(space=u_space, domain=boundary) | |
| u_bd_projector = fem.integrate(mass_form, fields={"u": u_bd_trial, "v": u_bd_test}, nodal=True) | |
| u_bd_value = fem.integrate( | |
| u_boundary_value, | |
| fields={"v": u_bd_test}, | |
| values={"top_vel": args.top_velocity}, | |
| nodal=True, | |
| output_dtype=wp.vec2d, | |
| ) | |
| fem.normalize_dirichlet_projector(u_bd_projector, u_bd_value) | |
| u_bd_rhs = wp.zeros_like(u_bd_value) | |
| fem.project_linear_system(u_matrix, u_bd_rhs, u_bd_projector, u_bd_value, normalize_projector=False) | |
| # div_bd_rhs = div_matrix * u_bd_rhs | |
| div_bd_rhs = wp.zeros(shape=(div_matrix.nrow,), dtype=div_matrix.scalar_type) | |
| bsr_mv(div_matrix, u_bd_rhs, y=div_bd_rhs) | |
| # div_matrix = div_matrix - div_matrix * bd_projector | |
| bsr_mm(x=bsr_copy(div_matrix), y=u_bd_projector, z=div_matrix, alpha=-1.0, beta=1.0) | |
| # Assemble saddle system with Scipy | |
| div_matrix = bsr_to_scipy(div_matrix) | |
| u_matrix = bsr_to_scipy(u_matrix) | |
| div_bd_rhs = div_bd_rhs.numpy() | |
| ones = np.ones(shape=(p_space.node_count(), 1), dtype=float) | |
| saddle_system = bmat( | |
| [ | |
| [u_matrix, div_matrix.transpose(), None], | |
| [div_matrix, None, ones], | |
| [None, ones.transpose(), None], | |
| ], | |
| ) | |
| with wp.ScopedTimer("LU factorization"): | |
| self._solve_saddle = factorized(saddle_system) | |
| # Save data for computing time steps rhs | |
| self._u_bd_projector = u_bd_projector | |
| self._u_bd_rhs = u_bd_rhs | |
| self._u_test = u_test | |
| self._div_bd_rhs = div_bd_rhs | |
| # Velocitiy field | |
| self._u_field = u_space.make_field() | |
| self.renderer = Plot(stage) | |
| self.renderer.add_surface_vector("velocity", self._u_field) | |
| def update(self): | |
| self.current_frame += 1 | |
| u_rhs = fem.integrate( | |
| transported_inertia_form, | |
| fields={"u": self._u_field, "v": self._u_test}, | |
| values={"dt": self.sim_dt}, | |
| output_dtype=wp.vec2d, | |
| ) | |
| # Apply boundary conditions | |
| # u_rhs = (I - P) * u_rhs + u_bd_rhs | |
| bsr_mv(self._u_bd_projector, x=u_rhs, y=u_rhs, alpha=-1.0, beta=1.0) | |
| array_axpy(x=self._u_bd_rhs, y=u_rhs, alpha=1.0, beta=1.0) | |
| # Assemble scipy saddle system rhs | |
| u_dof_count = self._u_bd_projector.shape[0] | |
| p_dof_count = self._div_bd_rhs.shape[0] | |
| tot_dof_count = u_dof_count + p_dof_count + 1 | |
| u_slice = slice(0, u_dof_count) | |
| p_slice = slice(u_dof_count, tot_dof_count - 1) | |
| saddle_rhs = np.zeros(tot_dof_count) | |
| saddle_rhs[u_slice] = u_rhs.numpy().flatten() | |
| saddle_rhs[p_slice] = self._div_bd_rhs | |
| x = self._solve_saddle(saddle_rhs) | |
| # Extract result | |
| self._u_field.dof_values = x[u_slice].reshape((-1, 2)) | |
| def render(self): | |
| self.renderer.begin_frame(time = self.current_frame * self.sim_dt) | |
| self.renderer.add_surface_vector("velocity", self._u_field) | |
| self.renderer.end_frame() | |
| if __name__ == "__main__": | |
| wp.init() | |
| wp.set_module_options({"enable_backward": False}) | |
| args = Example.parser.parse_args() | |
| example = Example(args=args) | |
| for k in range(args.num_frames): | |
| print(f"Frame {k}:") | |
| example.update() | |
| example.render() | |
| example.renderer.add_surface_vector("velocity_final", example._u_field) | |
| example.renderer.plot(streamlines=set(["velocity_final"])) | |