Spaces:
Configuration error
Configuration error
File size: 10,470 Bytes
3a12729 82c899e 3a12729 82c899e 3a12729 82c899e 3a12729 82c899e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# Re-defining the integrated class first
import os
import re
import zipfile
import numpy as np
import xarray as xr
from typing import List, Tuple
import shutil
import tempfile # Added for safe temp directory usage
class NAMEDataProcessor:
def __init__(self, output_root: str = None):
if output_root is None:
output_root = os.path.join(tempfile.gettempdir(), "name_outputs")
self.output_root = output_root
self.output_3d = os.path.join(self.output_root, "3D")
self.output_horizontal = os.path.join(self.output_root, "horizontal")
os.makedirs(self.output_3d, exist_ok=True)
os.makedirs(self.output_horizontal, exist_ok=True)
self.output_root = output_root
self.output_3d = os.path.join(self.output_root, "3D")
self.output_horizontal = os.path.join(self.output_root, "horizontal")
os.makedirs(self.output_3d, exist_ok=True)
os.makedirs(self.output_horizontal, exist_ok=True)
def _sanitize_key(self, key: str) -> str:
key = re.sub(r'\W+', '_', key)
if not key[0].isalpha():
key = f"attr_{key}"
return key
def _parse_metadata(self, lines: List[str]) -> dict:
metadata = {}
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
clean_key = self._sanitize_key(key.strip().lower())
metadata[clean_key] = value.strip()
try:
metadata.update({
"x_origin": float(metadata["x_grid_origin"]),
"y_origin": float(metadata["y_grid_origin"]),
"x_size": int(metadata["x_grid_size"]),
"y_size": int(metadata["y_grid_size"]),
"x_res": float(metadata["x_grid_resolution"]),
"y_res": float(metadata["y_grid_resolution"]),
})
except KeyError as e:
raise ValueError(f"Missing required metadata field: {e}")
except ValueError as e:
raise ValueError(f"Invalid value in metadata: {e}")
if metadata["x_res"] == 0 or metadata["y_res"] == 0:
raise ZeroDivisionError("Grid resolution cannot be zero.")
return metadata
def _get_data_lines(self, lines: List[str]) -> List[str]:
idx = next(i for i, l in enumerate(lines) if l.strip() == "Fields:")
return lines[idx + 1:]
def _is_horizontal_file(self, filename: str) -> bool:
return "HorizontalField" in filename
def _convert_horizontal(self, filepath: str, output_filename: str) -> str:
with open(filepath, 'r') as f:
lines = f.readlines()
meta = self._parse_metadata(lines)
data_lines = self._get_data_lines(lines)
lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
air_conc = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
dry_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
wet_depo = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
for line in data_lines:
parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
if len(parts) >= 7 and parts[0].isdigit() and parts[1].isdigit():
try:
x = int(parts[0]) - 1
y = int(parts[1]) - 1
air_val = float(parts[4])
dry_val = float(parts[5])
wet_val = float(parts[6])
if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
air_conc[y, x] = air_val
dry_depo[y, x] = dry_val
wet_depo[y, x] = wet_val
except Exception:
continue
ds = xr.Dataset(
{
"air_concentration": (['latitude', 'longitude'], air_conc),
"dry_deposition_rate": (['latitude', 'longitude'], dry_depo),
"wet_deposition_rate": (['latitude', 'longitude'], wet_depo)
},
coords={
"latitude": lats,
"longitude": lons
},
attrs={
"title": "Volcanic Ash Horizontal Output (Multiple Fields)",
"source": "NAME model output processed to NetCDF (horizontal multi-field)",
**{k: str(v) for k, v in meta.items()}
}
)
ds["air_concentration"].attrs.update({
"units": "g/m^3",
"long_name": "Boundary Layer Average Air Concentration"
})
ds["dry_deposition_rate"].attrs.update({
"units": "g/m^2/s",
"long_name": "Dry Deposition Rate"
})
ds["wet_deposition_rate"].attrs.update({
"units": "g/m^2/s",
"long_name": "Wet Deposition Rate"
})
ds["latitude"].attrs["units"] = "degrees_north"
ds["longitude"].attrs["units"] = "degrees_east"
out_path = os.path.join(self.output_horizontal, output_filename)
ds.to_netcdf(out_path, engine="netcdf4")
return out_path
def _convert_3d_group(self, group: List[Tuple[int, str]], output_filename: str) -> str:
first_file_path = group[0][1]
with open(first_file_path, 'r') as f:
lines = f.readlines()
meta = self._parse_metadata(lines)
lons = np.round(np.arange(meta["x_origin"], meta["x_origin"] + meta["x_size"] * meta["x_res"], meta["x_res"]), 6)
lats = np.round(np.arange(meta["y_origin"], meta["y_origin"] + meta["y_size"] * meta["y_res"], meta["y_res"]), 6)
z_levels = []
z_coords = []
for z_idx, filepath in group:
with open(filepath, 'r') as f:
lines = f.readlines()
data_lines = self._get_data_lines(lines)
grid = np.zeros((meta["y_size"], meta["x_size"]), dtype=np.float32)
for line in data_lines:
parts = [p.strip().strip(',') for p in line.strip().split(',') if p.strip()]
if len(parts) >= 5 and parts[0].isdigit() and parts[1].isdigit():
try:
x = int(parts[0]) - 1
y = int(parts[1]) - 1
val = float(parts[4])
if 0 <= x < meta["x_size"] and 0 <= y < meta["y_size"]:
grid[y, x] = val
except Exception:
continue
z_levels.append(grid)
z_coords.append(z_idx)
z_cube = np.stack(z_levels, axis=0)
ds = xr.Dataset(
{
"ash_concentration": (['altitude', 'latitude', 'longitude'], z_cube)
},
coords={
"altitude": np.array(z_coords, dtype=np.float32),
"latitude": lats,
"longitude": lons
},
attrs={
"title": "Volcanic Ash Concentration (3D)",
"source": "NAME model output processed to NetCDF (3D fields)",
**{k: str(v) for k, v in meta.items()}
}
)
out_path = os.path.join(self.output_3d, output_filename)
# 🔥 Check if file exists, delete it first
# if os.path.exists(out_path):
# os.remove(out_path)
# 🔥 Save NetCDF safely using netCDF4
ds.to_netcdf(out_path, engine="netcdf4")
return out_path
def batch_process_zip(self, zip_path: str) -> List[str]:
extract_dir = os.path.join(tempfile.gettempdir(), "unzipped_name_extract")
os.makedirs(extract_dir, exist_ok=True)
###
# Function to empty folder contents
def empty_folder(folder_path):
import os
import glob
files = glob.glob(os.path.join(folder_path, '*'))
for f in files:
try:
os.remove(f)
except IsADirectoryError:
shutil.rmtree(f)
# 🛠 Clear cached open files and garbage collect before deleting
# 🔥 Empty previous outputs, do not delete folders
if os.path.exists(self.output_3d):
empty_folder(self.output_3d)
else:
os.makedirs(self.output_3d, exist_ok=True)
# if os.path.exists(self.output_horizontal):
# empty_folder(self.output_horizontal)
# else:
# os.makedirs(self.output_horizontal, exist_ok=True)
# if os.path.exists(extract_dir):
# shutil.rmtree(extract_dir)
# os.makedirs(extract_dir, exist_ok=True)
#####
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
txt_files = []
for root, _, files in os.walk(extract_dir):
for file in files:
if file.endswith(".txt"):
txt_files.append(os.path.join(root, file))
horizontal_files = []
grouped_3d = {}
pattern = re.compile(r"_T(\d+)_.*_Z(\d+)\.txt$")
for f in txt_files:
if self._is_horizontal_file(f):
horizontal_files.append(f)
else:
match = pattern.search(f)
if match:
t = int(match.group(1))
z = int(match.group(2))
grouped_3d.setdefault(t, []).append((z, f))
nc_files = []
# Process horizontal
for f in sorted(horizontal_files):
base_name = os.path.splitext(os.path.basename(f))[0]
out_nc = self._convert_horizontal(f, f"{base_name}.nc")
nc_files.append(out_nc)
# Process 3D
for t_key in sorted(grouped_3d):
group = sorted(grouped_3d[t_key])
out_nc = self._convert_3d_group(group, f"T{t_key}.nc")
nc_files.append(out_nc)
return nc_files |