| import argparse |
| import logging |
| from pathlib import Path |
| from typing import List, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| import xarray as xr |
| from tqdm import tqdm |
|
|
|
|
| class SXRDataProcessor: |
| """Class to process GOES X-ray data, including downloading, combining, and interpolating data from multiple satellites. |
| This class handles the downloading of GOES data, combining data from different satellites, and applying interpolation |
| in log space to the X-ray flux data. |
| It also tracks which files were used in the processing. |
| Parameters |
| ---------- |
| save_dir : str |
| Directory where downloaded GOES data will be saved. |
| concat_dir : str |
| Directory where combined GOES data will be saved. |
| """ |
|
|
| def __init__(self, data_dir: str = '/mnt/data/PAPER/GOES-timespan', output_dir: str = '/mnt/data/PAPER/GOES-timespan/combined'): |
| self.data_dir = Path(data_dir) |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(exist_ok=True) |
| self.used_g13_files = [] |
| self.used_g14_files = [] |
| self.used_g15_files = [] |
| self.used_g16_files = [] |
| self.used_g17_files = [] |
| self.used_g18_files = [] |
|
|
| def combine_goes_data(self, columns_to_interp=["xrsb_flux", "xrsa_flux"]): |
| """ |
| Combine GOES-16 and GOES-18 files and track source files used. |
| Parameters |
| """ |
| print("🔍 Scanning for GOES data files...") |
| |
| g13_files = sorted(self.data_dir.glob("*g13*.nc")) |
| g14_files = sorted(self.data_dir.glob("*g14*.nc")) |
| g15_files = sorted(self.data_dir.glob("*g15*.nc")) |
| g16_files = sorted(self.data_dir.glob("*g16*.nc")) |
| g17_files = sorted(self.data_dir.glob("*g17*.nc")) |
| g18_files = sorted(self.data_dir.glob("*g18*.nc")) |
| |
| total_files = len(g13_files) + len(g14_files) + len(g15_files) + len(g16_files) + len(g17_files) + len(g18_files) |
| logging.info( |
| f"Found {len(g13_files)} GOES-13 files, {len(g14_files)} GOES-14 files, {len(g15_files)} GOES-15 files, {len(g16_files)} GOES-16 files, {len(g17_files)} GOES-17 files, and {len(g18_files)} GOES-18 files.") |
| print(f"Total files found: {total_files}") |
| |
| if total_files == 0: |
| print("No GOES data files found in the specified directory.") |
| return |
|
|
| def process_files(files, satellite_name, output_file, used_file_list): |
| datasets = [] |
| combined_meta = {} |
| successful_files = 0 |
| failed_files = 0 |
|
|
| print(f"🛰️ Processing {satellite_name} ({len(files)} files)...") |
| |
| |
| with tqdm(files, desc=f"Loading {satellite_name}", unit="file", |
| bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar: |
| for file_path in pbar: |
| pbar.set_postfix_str(f"Loading {file_path.name}") |
| try: |
| ds = xr.open_dataset(str(file_path)) |
| datasets.append(ds) |
| used_file_list.append(file_path) |
| successful_files += 1 |
| logging.info(f"Loaded {file_path.name}") |
| except Exception as e: |
| failed_files += 1 |
| logging.error(f"Could not load {file_path.name}: {e}") |
| continue |
| finally: |
| if 'ds' in locals(): |
| ds.close() |
|
|
| if not datasets: |
| print(f"No valid datasets for {satellite_name}") |
| logging.warning(f"No valid datasets for {satellite_name}") |
| return |
|
|
| print(f"Processing {len(datasets)} datasets for {satellite_name}...") |
| |
| try: |
| print(f"Concatenating datasets...") |
| combined_ds = xr.concat(datasets, dim='time').sortby('time') |
| |
| |
| if satellite_name in ['GOES-13', 'GOES-14', 'GOES-15']: |
| print(f"Applying scaling factors for {satellite_name}...") |
| combined_ds['xrsa_flux'] = combined_ds['xrsa_flux'] / .85 |
| combined_ds['xrsb_flux'] = combined_ds['xrsb_flux'] / .7 |
| |
| print(f"Converting to DataFrame...") |
| df = combined_ds.to_dataframe().reset_index() |
| |
| if 'quad_diode' in df.columns: |
| print(f"Filtering quad diode data...") |
| df = df[df['quad_diode'] == 0] |
|
|
| |
| print(f"Filtering out data where xrsb_flux has a quality flag of >0...") |
| df = df[df['xrsb_flag'] == 0] |
| |
| df['time'] = pd.to_datetime(df['time']) |
| df.set_index('time', inplace=True) |
| |
| print(f"Applying log interpolation...") |
| df_log = np.log10(df[columns_to_interp].replace(0, np.nan)) |
|
|
| |
| df_log_interp = df_log.interpolate(method="time", limit_direction="both") |
|
|
| |
| df[columns_to_interp] = 10 ** df_log_interp |
|
|
| |
| min_date = df.index.min().strftime('%Y%m%d') |
| max_date = df.index.max().strftime('%Y%m%d') |
| filename = f"{str(output_file)}_{min_date}_{max_date}.csv" |
| |
| print(f"Saving to {filename}...") |
| df.to_csv(filename, index=True) |
|
|
| print(f"Successfully processed {satellite_name}: {successful_files} files loaded, {failed_files} failed") |
| logging.info(f"Saved combined file: {output_file}") |
| |
| except Exception as e: |
| print(f"Failed to process {satellite_name}: {e}") |
| logging.error(f"Failed to write {output_file}: {e}") |
| finally: |
| for ds in datasets: |
| ds.close() |
|
|
| |
| satellites_to_process = [] |
| if len(g13_files) != 0: |
| satellites_to_process.append((g13_files, "GOES-13", self.output_dir / "combined_g13_avg1m", self.used_g13_files)) |
| if len(g14_files) != 0: |
| satellites_to_process.append((g14_files, "GOES-14", self.output_dir / "combined_g14_avg1m", self.used_g14_files)) |
| if len(g15_files) != 0: |
| satellites_to_process.append((g15_files, "GOES-15", self.output_dir / "combined_g15_avg1m", self.used_g15_files)) |
| if len(g16_files) != 0: |
| satellites_to_process.append((g16_files, "GOES-16", self.output_dir / "combined_g16_avg1m", self.used_g16_files)) |
| if len(g17_files) != 0: |
| satellites_to_process.append((g17_files, "GOES-17", self.output_dir / "combined_g17_avg1m", self.used_g17_files)) |
| if len(g18_files) != 0: |
| satellites_to_process.append((g18_files, "GOES-18", self.output_dir / "combined_g18_avg1m", self.used_g18_files)) |
|
|
| print(f"\nStarting processing of {len(satellites_to_process)} satellites...") |
| |
| |
| successful_satellites = 0 |
| failed_satellites = 0 |
| |
| for i, (files, satellite_name, output_file, used_file_list) in enumerate(satellites_to_process, 1): |
| print(f"\n{'='*60}") |
| print(f"Processing satellite {i}/{len(satellites_to_process)}: {satellite_name}") |
| print(f"{'='*60}") |
| |
| try: |
| process_files(files, satellite_name, output_file, used_file_list) |
| successful_satellites += 1 |
| except Exception as e: |
| print(f"Failed to process {satellite_name}: {e}") |
| failed_satellites += 1 |
| logging.error(f"Failed to process {satellite_name}: {e}") |
| |
| |
| print(f"\n{'='*60}") |
| print(f"PROCESSING COMPLETE") |
| print(f"{'='*60}") |
| print(f"Successfully processed: {successful_satellites} satellites") |
| print(f"Failed: {failed_satellites} satellites") |
| print(f"Total files processed: {total_files}") |
| print(f"Output directory: {self.output_dir}") |
| |
| |
| total_used_files = (len(self.used_g13_files) + len(self.used_g14_files) + |
| len(self.used_g15_files) + len(self.used_g16_files) + |
| len(self.used_g17_files) + len(self.used_g18_files)) |
| print(f"Files used in processing: {total_used_files}") |
| |
| if successful_satellites > 0: |
| print(f"\nSXR data processing completed successfully!") |
| else: |
| print(f"\n⚠No satellites were processed successfully.") |
|
|
|
|
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser(description='Preprocess GOES X-ray data.') |
| parser.add_argument('--data_dir', type=str, default='/mnt/data/PAPER/GOES-timespan', |
| help='Directory where downloaded GOES data is stored.') |
| parser.add_argument('--output_dir', type=str, default='/mnt/data/PAPER/GOES-timespan/combined', |
| help='Directory where combined GOES data will be saved.') |
| args = parser.parse_args() |
| |
| print("GOES SXR Data Processing Tool") |
| print("=" * 50) |
| print(f"Data directory: {args.data_dir}") |
| print(f"Output directory: {args.output_dir}") |
| print("=" * 50) |
| |
| processor = SXRDataProcessor(data_dir=args.data_dir, output_dir=args.output_dir) |
| processor.combine_goes_data() |
|
|
| print("\nAll processing tasks completed.") |
|
|