Spaces:
Paused
Paused
| # SPDX-FileCopyrightText: Copyright (c) 2023 - 2024 NVIDIA CORPORATION & AFFILIATES. | |
| # SPDX-FileCopyrightText: All rights reserved. | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| Dedalus script simulating a 2D periodic incompressible MHD flow with a passive | |
| tracer field for visualization. This script demonstrates solving a 2D periodic | |
| initial value problem. This script is meant to be ran in parallel, and uses the | |
| built-in analysis framework to save data snapshots to HDF5 files. | |
| The simulation should take at least 100 gpu-minutes to run. | |
| The initial flow is in the x-direction and depends only on z. The problem is | |
| non-dimensionalized usign the shear-layer spacing and velocity jump, so the | |
| resulting viscosity and tracer diffusivity are related to the Reynolds and | |
| Schmidt numbers as: | |
| nu = 1 / Re | |
| eta = 1 / ReM | |
| D = nu / Schmidt | |
| To run this script: | |
| $ python dedalus_mhd_parallel.py | |
| """ | |
| import os | |
| import glob | |
| import h5py | |
| import numpy as np | |
| import functools | |
| from functools import partial | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| import argparse | |
| import multiprocessing as mp | |
| import dedalus | |
| import dedalus.public as d3 | |
| from dedalus.extras import plot_tools | |
| import pathlib | |
| from docopt import docopt | |
| from dedalus.tools import logging | |
| from dedalus.tools import post | |
| from dedalus.tools.parallel import Sync | |
| import logging | |
| import math | |
| from IPython.display import display | |
| import imageio | |
| from importlib import reload | |
| from my_random_fields import GRF_Mattern | |
| import torch | |
| from functorch import vmap | |
| from hydra import compose, initialize | |
| from hydra.utils import get_class | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| # display(device) | |
| def check_if_complete(sim_outputs, Nt=101): | |
| try: | |
| files = sorted(glob.glob(sim_outputs)) | |
| file = files[0] | |
| with h5py.File(file, mode="r") as h5file: | |
| data_file = h5file["tasks"] | |
| keys = list(data_file.keys()) | |
| dims = data_file[keys[0]].dims | |
| t = dims[0]["sim_time"][:] | |
| if len(t) == Nt: | |
| return True | |
| else: | |
| return False | |
| except Exception: | |
| return False | |
| if __name__ == "__main__": | |
| import sys | |
| # Parse command line args before Hydra initialization | |
| parser = argparse.ArgumentParser(add_help=False) | |
| parser.add_argument('--Re', type=float, help='Reynolds number') | |
| parser.add_argument('--N', type=int, help='Number of samples') | |
| args, remaining_argv = parser.parse_known_args() | |
| # Initialize Hydra with remaining args | |
| sys.argv = [sys.argv[0]] + remaining_argv | |
| initialize(version_base=None, config_path=".", job_name="generate_mhd_field") | |
| cfg = compose(config_name="mhd_field") | |
| # Parameters - override with command line args if provided | |
| Lx, Ly = cfg.Lx, cfg.Ly | |
| Nx, Ny = cfg.Nx, cfg.Ny | |
| Re = args.Re if args.Re is not None else cfg.Re # Use CLI arg or default to config | |
| Re = int(Re) | |
| ReM = Re | |
| Schmidt = cfg.Schmidt # 1 | |
| rho0 = cfg.rho0 # 1.0 | |
| dealias = cfg.dealias # 3/2 | |
| stop_sim_time = cfg.tend | |
| timestepper = get_class(cfg.timestepper) # d3.RK443 #d3.RK222 | |
| Dt = cfg.Dt # 1e-3 | |
| max_timestep = cfg.max_timestep # 1e-2 | |
| output_dt = cfg.output_dt # 1e-2 # 1e-1 | |
| log_iter = cfg.log_iter # 10 | |
| dtype = get_class(cfg.dtype) # np.float64 | |
| max_writes = cfg.max_writes # None | |
| logger = logging.getLogger(__name__) | |
| output_dir = f"/Datasets/mhd_data/simulation_outputs_Re{Re}" | |
| movie_dir = f"{output_dir}/movie" | |
| use_cfl = cfg.use_cfl # False | |
| skip_exists = cfg.skip_exists # False | |
| ## ID Parameters | |
| L = cfg.L # 1 | |
| dim = 2 | |
| Nsamples = args.N if args.N is not None else cfg.N # Use CLI arg or default to config | |
| l_u = cfg.l_u # 0.1 | |
| l_A = cfg.l_A # 0.1 | |
| Nu = cfg.Nu # None | |
| sigma_u = cfg.sigma_u # 0.1 | |
| sigma_A = cfg.sigma_A # 5e-3 | |
| # Generate Random Initial Data | |
| grf_u = GRF_Mattern( | |
| dim, | |
| Nx, | |
| length=Lx, | |
| nu=Nu, | |
| l=l_u, | |
| sigma=sigma_u, | |
| boundary="periodic", | |
| device=device, | |
| ) | |
| grf_A = GRF_Mattern( | |
| dim, | |
| Nx, | |
| length=Lx, | |
| nu=Nu, | |
| l=l_A, | |
| sigma=sigma_A, | |
| boundary="periodic", | |
| device=device, | |
| ) | |
| u0_pot = grf_u.sample(Nsamples).cpu().numpy().reshape(Nsamples, Nx, Ny) | |
| A0 = grf_A.sample(Nsamples).cpu().numpy().reshape(Nsamples, Nx, Ny) | |
| digits = int(math.log10(Nsamples)) + 1 | |
| # expected number of time steps | |
| Nt = len(np.arange(0, stop_sim_time + Dt, output_dt)) | |
| indices = list(range(Nsamples)) | |
| if skip_exists: | |
| completed_list = [] | |
| for j in range(Nsamples): | |
| # print('hi') | |
| sim_output_dir = os.path.join(output_dir, f"output-{j:0{digits}}") | |
| sim_outputs = os.path.join(sim_output_dir, "*.h5") | |
| # skip if the next output directory exists and if the output is complete | |
| if os.path.exists(sim_output_dir): | |
| completed = check_if_complete(sim_outputs, Nt=Nt) | |
| else: | |
| completed = False | |
| completed_list.append(completed) | |
| indices = [j for j, completed in enumerate(completed_list) if not completed] | |
| print(indices) | |
| def run_simulation( | |
| i, | |
| Lx=Lx, | |
| Ly=Ly, | |
| Nx=Nx, | |
| Ny=Ny, | |
| Re=Re, | |
| ReM=ReM, | |
| Schmidt=Schmidt, | |
| rho0=rho0, | |
| dealias=dealias, | |
| stop_sim_time=stop_sim_time, | |
| timestepper=timestepper, | |
| Dt=Dt, | |
| max_timestep=max_timestep, | |
| output_dt=output_dt, | |
| log_iter=log_iter, | |
| dtype=dtype, | |
| max_writes=max_writes, | |
| logger=logger, | |
| output_dir=output_dir, | |
| use_cfl=use_cfl, | |
| L=L, | |
| dim=dim, | |
| Nsamples=Nsamples, | |
| l_u=l_u, | |
| l_A=l_A, | |
| Nu=Nu, | |
| sigma_u=sigma_u, | |
| sigma_A=sigma_A, | |
| grf_u=grf_u, | |
| grf_A=grf_A, | |
| u0_pot=u0_pot, | |
| A0=A0, | |
| digits=digits, | |
| Nt=Nt, | |
| ): | |
| sim_output_dir = os.path.join(output_dir, f"output-{i:0{digits}}") | |
| sim_outputs = os.path.join(sim_output_dir, "*.h5") | |
| print( | |
| f"Running simulation {i:0{digits}} with outputs in {sim_output_dir}", | |
| flush=True, | |
| ) | |
| # Bases | |
| coords = d3.CartesianCoordinates("x", "y") | |
| dist = d3.Distributor(coords, dtype=dtype) | |
| xbasis = d3.RealFourier(coords["x"], size=Nx, bounds=(0, Lx), dealias=dealias) | |
| ybasis = d3.RealFourier(coords["y"], size=Ny, bounds=(0, Ly), dealias=dealias) | |
| # Fields | |
| p = dist.Field(name="p", bases=(xbasis, ybasis)) | |
| s = dist.Field(name="s", bases=(xbasis, ybasis)) | |
| u = dist.VectorField(coords, name="u", bases=(xbasis, ybasis)) | |
| B = dist.VectorField(coords, name="B", bases=(xbasis, ybasis)) | |
| A = dist.Field(name="A", bases=(xbasis, ybasis)) | |
| B2 = dist.Field(name="B2", bases=(xbasis, ybasis)) | |
| u_pot = dist.Field(name="u_pot", bases=(xbasis, ybasis)) | |
| Ax = dist.Field(name="Ax", bases=(xbasis, ybasis)) | |
| Ay = dist.Field(name="Ay", bases=(xbasis, ybasis)) | |
| Bx = dist.Field(name="Bx", bases=(xbasis, ybasis)) | |
| By = dist.Field(name="By", bases=(xbasis, ybasis)) | |
| u0 = dist.VectorField(coords, name="u0", bases=(xbasis, ybasis)) | |
| ux = dist.Field(name="ux", bases=(xbasis, ybasis)) | |
| uy = dist.Field(name="uy", bases=(xbasis, ybasis)) | |
| tau_p = dist.Field(name="tau_p") | |
| # Substitutions | |
| nu = 1 / Re | |
| D = nu / Schmidt | |
| eta = 1 / ReM | |
| x, y = dist.local_grids(xbasis, ybasis) | |
| X, Y = np.meshgrid(x, y, indexing="ij") | |
| ex, ey = coords.unit_vector_fields(dist) | |
| # ez = d3.CrossProduct(ex, ey) | |
| curl2d_scalar = lambda x: -d3.skew(d3.grad(x)) | |
| curl2d_vector = lambda x: -d3.div(d3.skew(x)) | |
| B = curl2d_scalar(A) | |
| B2 = d3.dot(B, B) | |
| Bx = B @ ex | |
| By = B @ ey | |
| ux = u @ ex | |
| uy = u @ ey | |
| # Problem | |
| problem = d3.IVP([u, p, A, tau_p, s], namespace=locals()) | |
| problem.add_equation( | |
| "dt(u) + grad(p)/rho0 - nu*lap(u) = - 0.5*grad(B2)/rho0 - u@grad(u) + B@grad(B)/rho0" | |
| ) | |
| problem.add_equation("dt(s) - D*lap(s) = - u@grad(s)") | |
| problem.add_equation("dt(A) - eta*lap(A) = - u@grad(A)") | |
| problem.add_equation("div(u) + tau_p = 0") | |
| problem.add_equation("integ(p) = 0") # Pressure gauge | |
| # Solver | |
| solver = problem.build_solver(timestepper) | |
| # solver.stop_sim_time = stop_sim_time | |
| solver.stop_sim_time = ( | |
| stop_sim_time + Dt | |
| ) # Make sure we record the last timestep | |
| # Initial conditions | |
| u_pot["g"] = u0_pot[i] | |
| u0 = curl2d_scalar(u_pot).evaluate() | |
| u0.change_scales(1) | |
| u["g"] = u0["g"] | |
| ux = u @ ex | |
| uy = u @ ey | |
| B2 = d3.dot(B, B) | |
| # s.set_global_data(u0_pot[i]) | |
| s["g"] = u0_pot[i] | |
| # A.set_global_data(A0[i]) | |
| A["g"] = A0[i] | |
| # Analysis (This overwrites existing files) | |
| os.makedirs(sim_output_dir, exist_ok=True) | |
| snapshots = solver.evaluator.add_file_handler( | |
| sim_output_dir, sim_dt=output_dt, max_writes=max_writes | |
| ) | |
| snapshots.add_task(s, name="tracer") | |
| snapshots.add_task(A, name="vector potential") | |
| snapshots.add_task(B, name="magnetic field") | |
| snapshots.add_task(u, name="velocity") | |
| snapshots.add_task(p, name="pressure") | |
| # CFL (Don't actually use this. Use constant timestep instead) | |
| CFL = d3.CFL( | |
| solver, | |
| initial_dt=max_timestep, | |
| cadence=10, | |
| safety=0.2, | |
| threshold=0.1, | |
| max_change=1.5, | |
| min_change=0.5, | |
| max_dt=max_timestep, | |
| ) | |
| CFL.add_velocity(u) | |
| # Flow properties | |
| flow = d3.GlobalFlowProperty(solver, cadence=10) | |
| flow.add_property(d3.dot(u, u), name="w2") | |
| flow.add_property(d3.dot(B, B), name="B2") | |
| flow.add_property(d3.div(B), name="divB") | |
| # Main loop | |
| try: | |
| logger.info("Starting main loop") | |
| while solver.proceed: | |
| if use_cfl: | |
| timestep = CFL.compute_timestep() | |
| else: | |
| timestep = Dt | |
| solver.step(timestep) | |
| if (solver.iteration) % 10 == 0: | |
| max_w = np.sqrt(flow.max("w2")) | |
| max_B = np.sqrt(flow.max("B2")) | |
| max_divB = flow.max("divB") | |
| logger.info( | |
| f"Iteration={solver.iteration}, Time={solver.sim_time:#.3g}, dt={timestep:#.3g}, max(w)={max_w:#.3g}, max(B)={max_B:#.3g}, max(div_B)={max_divB:#.3g}" | |
| ) | |
| print( | |
| f"Finished simulation {i:0{digits}} with outputs in {sim_output_dir}", | |
| flush=True, | |
| ) | |
| except: | |
| logger.error("Exception raised, triggering end of main loop.") | |
| raise | |
| solver.log_stats() | |
| # Run in parallel | |
| with mp.Pool(mp.cpu_count() - 1) as pool: | |
| pool.map(run_simulation, indices, chunksize=10) | |