FOXES / data /sxr_data_processing.py
griffingoodwin04's picture
Refactor pipeline configuration and update data processing scripts
ec2b4e7
import argparse
import logging
from pathlib import Path
from typing import List, Tuple
import numpy as np
import pandas as pd
import xarray as xr
from tqdm import tqdm
class SXRDataProcessor:
"""Class to process GOES X-ray data, including downloading, combining, and interpolating data from multiple satellites.
This class handles the downloading of GOES data, combining data from different satellites, and applying interpolation
in log space to the X-ray flux data.
It also tracks which files were used in the processing.
Parameters
----------
save_dir : str
Directory where downloaded GOES data will be saved.
concat_dir : str
Directory where combined GOES data will be saved.
"""
def __init__(self, data_dir: str = '/mnt/data/PAPER/GOES-timespan', output_dir: str = '/mnt/data/PAPER/GOES-timespan/combined'):
self.data_dir = Path(data_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.used_g13_files = []
self.used_g14_files = []
self.used_g15_files = []
self.used_g16_files = []
self.used_g17_files = []
self.used_g18_files = []
def combine_goes_data(self, columns_to_interp=["xrsb_flux", "xrsa_flux"]):
"""
Combine GOES-16 and GOES-18 files and track source files used.
Parameters
"""
print("🔍 Scanning for GOES data files...")
g13_files = sorted(self.data_dir.glob("*g13*.nc"))
g14_files = sorted(self.data_dir.glob("*g14*.nc"))
g15_files = sorted(self.data_dir.glob("*g15*.nc"))
g16_files = sorted(self.data_dir.glob("*g16*.nc"))
g17_files = sorted(self.data_dir.glob("*g17*.nc"))
g18_files = sorted(self.data_dir.glob("*g18*.nc"))
total_files = len(g13_files) + len(g14_files) + len(g15_files) + len(g16_files) + len(g17_files) + len(g18_files)
logging.info(
f"Found {len(g13_files)} GOES-13 files, {len(g14_files)} GOES-14 files, {len(g15_files)} GOES-15 files, {len(g16_files)} GOES-16 files, {len(g17_files)} GOES-17 files, and {len(g18_files)} GOES-18 files.")
print(f"Total files found: {total_files}")
if total_files == 0:
print("No GOES data files found in the specified directory.")
return
def process_files(files, satellite_name, output_file, used_file_list):
datasets = []
combined_meta = {}
successful_files = 0
failed_files = 0
print(f"🛰️ Processing {satellite_name} ({len(files)} files)...")
# Progress bar for file loading
with tqdm(files, desc=f"Loading {satellite_name}", unit="file",
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]') as pbar:
for file_path in pbar:
pbar.set_postfix_str(f"Loading {file_path.name}")
try:
ds = xr.open_dataset(str(file_path))
datasets.append(ds)
used_file_list.append(file_path) # Track file used
successful_files += 1
logging.info(f"Loaded {file_path.name}")
except Exception as e:
failed_files += 1
logging.error(f"Could not load {file_path.name}: {e}")
continue
finally:
if 'ds' in locals():
ds.close()
if not datasets:
print(f"No valid datasets for {satellite_name}")
logging.warning(f"No valid datasets for {satellite_name}")
return
print(f"Processing {len(datasets)} datasets for {satellite_name}...")
try:
print(f"Concatenating datasets...")
combined_ds = xr.concat(datasets, dim='time').sortby('time')
# Scaling factors for GOES-13, GOES-14, and GOES-15
if satellite_name in ['GOES-13', 'GOES-14', 'GOES-15']:
print(f"Applying scaling factors for {satellite_name}...")
combined_ds['xrsa_flux'] = combined_ds['xrsa_flux'] / .85
combined_ds['xrsb_flux'] = combined_ds['xrsb_flux'] / .7
print(f"Converting to DataFrame...")
df = combined_ds.to_dataframe().reset_index()
if 'quad_diode' in df.columns:
print(f"Filtering quad diode data...")
df = df[df['quad_diode'] == 0] # Filter out quad diode data
#Filter out data where xrsb_flux has a quality flag of >0
print(f"Filtering out data where xrsb_flux has a quality flag of >0...")
df = df[df['xrsb_flag'] == 0]
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)
print(f"Applying log interpolation...")
df_log = np.log10(df[columns_to_interp].replace(0, np.nan))
# Step 3: Interpolate in log space
df_log_interp = df_log.interpolate(method="time", limit_direction="both")
# Step 4: Back-transform to linear space
df[columns_to_interp] = 10 ** df_log_interp
# Add min and max dates to filename
min_date = df.index.min().strftime('%Y%m%d')
max_date = df.index.max().strftime('%Y%m%d')
filename = f"{str(output_file)}_{min_date}_{max_date}.csv"
print(f"Saving to {filename}...")
df.to_csv(filename, index=True)
print(f"Successfully processed {satellite_name}: {successful_files} files loaded, {failed_files} failed")
logging.info(f"Saved combined file: {output_file}")
except Exception as e:
print(f"Failed to process {satellite_name}: {e}")
logging.error(f"Failed to write {output_file}: {e}")
finally:
for ds in datasets:
ds.close()
# Create list of satellites to process
satellites_to_process = []
if len(g13_files) != 0:
satellites_to_process.append((g13_files, "GOES-13", self.output_dir / "combined_g13_avg1m", self.used_g13_files))
if len(g14_files) != 0:
satellites_to_process.append((g14_files, "GOES-14", self.output_dir / "combined_g14_avg1m", self.used_g14_files))
if len(g15_files) != 0:
satellites_to_process.append((g15_files, "GOES-15", self.output_dir / "combined_g15_avg1m", self.used_g15_files))
if len(g16_files) != 0:
satellites_to_process.append((g16_files, "GOES-16", self.output_dir / "combined_g16_avg1m", self.used_g16_files))
if len(g17_files) != 0:
satellites_to_process.append((g17_files, "GOES-17", self.output_dir / "combined_g17_avg1m", self.used_g17_files))
if len(g18_files) != 0:
satellites_to_process.append((g18_files, "GOES-18", self.output_dir / "combined_g18_avg1m", self.used_g18_files))
print(f"\nStarting processing of {len(satellites_to_process)} satellites...")
# Process each satellite with overall progress tracking
successful_satellites = 0
failed_satellites = 0
for i, (files, satellite_name, output_file, used_file_list) in enumerate(satellites_to_process, 1):
print(f"\n{'='*60}")
print(f"Processing satellite {i}/{len(satellites_to_process)}: {satellite_name}")
print(f"{'='*60}")
try:
process_files(files, satellite_name, output_file, used_file_list)
successful_satellites += 1
except Exception as e:
print(f"Failed to process {satellite_name}: {e}")
failed_satellites += 1
logging.error(f"Failed to process {satellite_name}: {e}")
# Print final summary
print(f"\n{'='*60}")
print(f"PROCESSING COMPLETE")
print(f"{'='*60}")
print(f"Successfully processed: {successful_satellites} satellites")
print(f"Failed: {failed_satellites} satellites")
print(f"Total files processed: {total_files}")
print(f"Output directory: {self.output_dir}")
# Print file usage statistics
total_used_files = (len(self.used_g13_files) + len(self.used_g14_files) +
len(self.used_g15_files) + len(self.used_g16_files) +
len(self.used_g17_files) + len(self.used_g18_files))
print(f"Files used in processing: {total_used_files}")
if successful_satellites > 0:
print(f"\nSXR data processing completed successfully!")
else:
print(f"\n⚠No satellites were processed successfully.")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Preprocess GOES X-ray data.')
parser.add_argument('--data_dir', type=str, default='/mnt/data/PAPER/GOES-timespan',
help='Directory where downloaded GOES data is stored.')
parser.add_argument('--output_dir', type=str, default='/mnt/data/PAPER/GOES-timespan/combined',
help='Directory where combined GOES data will be saved.')
args = parser.parse_args()
print("GOES SXR Data Processing Tool")
print("=" * 50)
print(f"Data directory: {args.data_dir}")
print(f"Output directory: {args.output_dir}")
print("=" * 50)
processor = SXRDataProcessor(data_dir=args.data_dir, output_dir=args.output_dir)
processor.combine_goes_data()
print("\nAll processing tasks completed.")