FOXES / data /process_data_pipeline.py
griffingoodwin04's picture
Refactor pipeline configuration and update data processing scripts
ec2b4e7
#!/usr/bin/env python3
"""
Data Processing Pipeline Orchestrator
This script orchestrates the three main data processing steps:
1. EUV data cleaning (euv_data_cleaning.py) - removes bad AIA files
2. ITI data processing (iti_data_processing.py) - processes the good data
3. Data alignment (align_data.py) - concatenates GOES data and checks for missing data
Each step can be skipped if it's already completed.
Configuration:
- Use --config to specify a custom configuration file (YAML or Python)
- Use --show-config to display current configuration
- Use --create-template to create a YAML configuration template
"""
import os
import sys
import subprocess
import time
import logging
from datetime import datetime
from pathlib import Path
from pipeline_config import PipelineConfig
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_processing_pipeline.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class DataProcessingPipeline:
def __init__(self, base_dir=None, config=None):
"""
Initialize the data processing pipeline.
Args:
base_dir: Base directory for the project. If None, uses current script's directory.
config: PipelineConfig instance. If None, uses default configuration.
"""
if base_dir is None:
self.base_dir = Path(__file__).parent
else:
self.base_dir = Path(base_dir)
# Load configuration
self.config = config if config is not None else PipelineConfig()
# Define script paths
self.scripts = {
'euv_cleaning': self.base_dir / 'euv_data_cleaning.py',
'iti_processing': self.base_dir / 'iti_data_processing.py',
'align_data': self.base_dir / 'align_data.py'
}
# Define step names and descriptions
self.steps = {
'euv_cleaning': {
'name': 'EUV Data Cleaning',
'description': 'Remove bad AIA files based on timestamp validation',
'output_check': self._check_euv_cleaning_complete
},
'iti_processing': {
'name': 'ITI Data Processing',
'description': 'Process good AIA data using ITI methods',
'output_check': self._check_iti_processing_complete
},
'align_data': {
'name': 'Data Alignment',
'description': 'Concatenate GOES data and check for missing data',
'output_check': self._check_align_data_complete
}
}
def _check_euv_cleaning_complete(self):
"""
Check if EUV data cleaning is complete by looking for the bad files directory.
"""
bad_files_dir = Path(self.config.get_path('euv', 'bad_files_dir'))
if bad_files_dir.exists():
# Check if any files were moved (indicating cleaning was done)
wavelengths = self.config.get_path('euv', 'wavelengths')
wavelength_dirs = [bad_files_dir / str(wl) for wl in wavelengths]
return any(d.exists() and any(d.iterdir()) for d in wavelength_dirs)
return False
def _check_iti_processing_complete(self):
"""
Check if ITI data processing is complete by looking for processed files.
"""
output_dir = Path(self.config.get_path('iti', 'output_folder'))
if output_dir.exists():
# Check if there are processed .npy files
npy_files = list(output_dir.glob('*.npy'))
return len(npy_files) > 0
return False
def _check_align_data_complete(self):
"""
Check if data alignment is complete by looking for output directories.
"""
output_dir = Path(self.config.get_path('alignment', 'output_sxr_dir'))
return output_dir.exists() and any(output_dir.iterdir())
def run_script(self, script_name, step_info):
"""
Run a single processing script.
Args:
script_name: Name of the script to run
step_info: Dictionary containing step information
Returns:
bool: True if successful, False otherwise
"""
script_path = self.scripts[script_name]
if not script_path.exists():
logger.error(f"Script not found: {script_path}")
return False
logger.info(f"Starting {step_info['name']}...")
logger.info(f"Description: {step_info['description']}")
logger.info(f"Running: {script_path}")
# Create environment variables for configuration
env = os.environ.copy()
env.update({
'PIPELINE_CONFIG': self.config.to_json(),
'BASE_DATA_DIR': self.config.get_path('base_data_dir', 'base_data_dir')
})
start_time = time.time()
try:
# Run the script
result = subprocess.run(
[sys.executable, str(script_path)],
cwd=self.base_dir,
env=env
)
end_time = time.time()
duration = end_time - start_time
if result.returncode == 0:
logger.info(f"✓ {step_info['name']} completed successfully in {duration:.2f} seconds")
return True
else:
logger.error(f"✗ {step_info['name']} failed with return code {result.returncode}")
return False
except Exception as e:
end_time = time.time()
duration = end_time - start_time
logger.error(f"✗ {step_info['name']} failed with exception: {e}")
logger.error(f"Duration: {duration:.2f} seconds")
return False
def run_pipeline(self, force_rerun=False):
"""
Run the complete data processing pipeline.
Args:
force_rerun: If True, run all steps regardless of completion status
"""
logger.info("=" * 80)
logger.info("Starting Data Processing Pipeline")
logger.info("=" * 80)
logger.info(f"Base directory: {self.base_dir}")
logger.info(f"Force rerun: {force_rerun}")
logger.info("=" * 80)
pipeline_start_time = time.time()
successful_steps = 0
failed_steps = 0
for step_name, step_info in self.steps.items():
logger.info(f"\n--- Step: {step_info['name']} ---")
# Check if step is already complete
if not force_rerun and step_info['output_check']():
logger.info(f"✓ {step_info['name']} already completed - skipping")
successful_steps += 1
continue
# Run the step
if self.run_script(step_name, step_info):
successful_steps += 1
else:
failed_steps += 1
logger.error(f"Pipeline stopped due to failure in {step_info['name']}")
break
pipeline_end_time = time.time()
total_duration = pipeline_end_time - pipeline_start_time
# Summary
logger.info("\n" + "=" * 80)
logger.info("PIPELINE SUMMARY")
logger.info("=" * 80)
logger.info(f"Total duration: {total_duration:.2f} seconds")
logger.info(f"Successful steps: {successful_steps}")
logger.info(f"Failed steps: {failed_steps}")
if failed_steps == 0:
logger.info("✓ All steps completed successfully!")
else:
logger.error("✗ Pipeline completed with errors")
logger.info("=" * 80)
return failed_steps == 0
def main():
"""Main function to run the pipeline."""
import argparse
parser = argparse.ArgumentParser(description='Data Processing Pipeline Orchestrator')
parser.add_argument('--force', action='store_true',
help='Force rerun all steps regardless of completion status')
parser.add_argument('--base-dir', type=str,
help='Base directory for the project (default: script directory)')
parser.add_argument('--config', type=str,
help='Path to custom configuration file (YAML or Python)')
parser.add_argument('--show-config', action='store_true',
help='Display current configuration and exit')
parser.add_argument('--create-template', action='store_true',
help='Create a YAML configuration template file and exit')
parser.add_argument('--validate', action='store_true',
help='Validate configuration paths and exit')
args = parser.parse_args()
# Handle special commands
if args.create_template:
config = PipelineConfig()
config.save_config_template()
return
# Load configuration
config = PipelineConfig(args.config)
if args.show_config:
config.print_config()
return
if args.validate:
is_valid, missing_paths = config.validate_paths()
if is_valid:
print("✓ All required paths exist")
else:
print("✗ Missing required paths:")
for path in missing_paths:
print(f" - {path}")
return
# Create pipeline instance
pipeline = DataProcessingPipeline(args.base_dir, config)
# Validate paths before running
is_valid, missing_paths = config.validate_paths()
if not is_valid:
logger.error("Configuration validation failed. Missing required paths:")
for path in missing_paths:
logger.error(f" - {path}")
logger.error("Use --validate to check configuration")
sys.exit(1)
# Create necessary directories
config.create_directories()
# Run the pipeline
success = pipeline.run_pipeline(force_rerun=args.force)
# Exit with appropriate code
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()