| | import os |
| | import requests |
| | import pandas as pd |
| | from prefect import flow, task |
| | from src.ingestion.ingest import fetch_daily_data |
| | from src.processing.features import process_data |
| | from src.processing.split import split_data |
| | from src.models.train import ModelTrainer |
| | from tests.data_validation import validate_data |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | from src.orchestration.notifications import notify_discord |
| |
|
| | @task(retries=3, retry_delay_seconds=60) |
| | def fetch_stock_data(symbol: str): |
| | """Task to fetch stock data with retries.""" |
| | try: |
| | file_path = fetch_daily_data(symbol) |
| | return file_path |
| | except Exception as e: |
| | raise e |
| |
|
| | @task |
| | def process_stock_data(file_path: str, symbol: str): |
| | """Task to process stock data.""" |
| | output_path = f"data/processed/{symbol}_processed.csv" |
| | os.makedirs("data/processed", exist_ok=True) |
| | df = process_data(file_path, output_path) |
| | return df |
| |
|
| | @task |
| | def train_and_evaluate(df: pd.DataFrame, symbol: str): |
| | """Task to train models and evaluate.""" |
| | train_df, test_df = split_data(df) |
| | |
| | |
| | validate_data(train_df, test_df, output_dir=f"reports/{symbol}") |
| | |
| | |
| | trainer = ModelTrainer(output_dir=f"models/{symbol}", metrics_dir=f"reports/{symbol}") |
| | trainer.train_regression(train_df, test_df) |
| | trainer.train_classification(train_df, test_df) |
| | trainer.train_clustering(df) |
| | trainer.train_pca(df) |
| | trainer.save_metrics() |
| | |
| | return True |
| |
|
| | @flow(name="End-to-End Stock Prediction Pipeline") |
| | def main_pipeline(symbols: list[str] = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "NVDA"]): |
| | """Main flow to run the entire pipeline.""" |
| | notify_discord("π Starting End-to-End Pipeline...") |
| | |
| | for symbol in symbols: |
| | try: |
| | print(f"Processing {symbol}...") |
| | raw_path = fetch_stock_data(symbol) |
| | df = process_stock_data(raw_path, symbol) |
| | train_and_evaluate(df, symbol) |
| | notify_discord(f"β
Pipeline completed for {symbol}") |
| | except Exception as e: |
| | notify_discord(f"β Pipeline failed for {symbol}: {e}") |
| | print(f"Error processing {symbol}: {e}") |
| |
|
| | if __name__ == "__main__": |
| | main_pipeline() |
| |
|