| | import pandas as pd
|
| | import numpy as np
|
| | import scipy.stats as stats
|
| |
|
| | def calculate_rolling_metrics(input_csv, output_csv):
|
| | print(f"Reading data from: {input_csv}")
|
| |
|
| | try:
|
| | df = pd.read_csv(input_csv)
|
| | except FileNotFoundError:
|
| | print(f"Error: The file '{input_csv}' was not found.")
|
| | return
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['Time_deal'] = pd.to_datetime(df['Time_deal'], errors='coerce')
|
| |
|
| |
|
| | df = df.sort_values(by='Time_deal').reset_index(drop=True)
|
| |
|
| |
|
| | df['Profit'] = pd.to_numeric(df['Profit'], errors='coerce').fillna(0.0)
|
| | df['Balance'] = pd.to_numeric(df['Balance'], errors='coerce').ffill()
|
| |
|
| |
|
| | equity = df['Balance']
|
| | profits = df['Profit']
|
| |
|
| |
|
| |
|
| | initial_balance = equity.iloc[0]
|
| |
|
| |
|
| | start_time = df['Time_deal'].iloc[0]
|
| |
|
| | years_elapsed = (df['Time_deal'] - start_time).dt.total_seconds() / (365.25 * 24 * 3600)
|
| | years_elapsed = years_elapsed.replace(0, 0.000001)
|
| |
|
| |
|
| |
|
| |
|
| | print("Calculating rolling metrics...")
|
| |
|
| |
|
| |
|
| | wins = profits.clip(lower=0)
|
| | losses = profits.clip(upper=0).abs()
|
| |
|
| | expand_wins_sum = wins.expanding().sum()
|
| | expand_losses_sum = losses.expanding().sum()
|
| | expand_max_win = wins.expanding().max()
|
| | expand_max_loss = losses.expanding().max()
|
| |
|
| |
|
| | hwm = equity.expanding().max()
|
| | dd_dollar = hwm - equity
|
| | dd_pct = dd_dollar / hwm
|
| |
|
| | dd_sq_mean = (dd_pct ** 2).expanding().mean()
|
| | dd_sq_sum = (dd_pct ** 2).expanding().sum()
|
| | max_dd_pct = dd_pct.expanding().max()
|
| |
|
| |
|
| | pct_ret = equity.pct_change().fillna(0)
|
| | roll_mean_ret = pct_ret.expanding().mean()
|
| | roll_var_ret = pct_ret.expanding().var()
|
| | roll_std_ret = pct_ret.expanding().std()
|
| | roll_skew = pct_ret.expanding().skew().fillna(0)
|
| | roll_kurt = pct_ret.expanding().kurt().fillna(0)
|
| | n_trades = profits.expanding().count()
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | df['rolling_Expectancy_Expectancy'] = profits.expanding().mean()
|
| |
|
| |
|
| | df['rolling_GPR_GainToPainRatio'] = expand_wins_sum / expand_losses_sum.replace(0, np.nan)
|
| |
|
| |
|
| |
|
| |
|
| | df['rolling_CAGR_CompoundAnnualGrowthRate'] = (
|
| | (equity / initial_balance).abs().pow(1 / years_elapsed) - 1
|
| | )
|
| |
|
| |
|
| | ulcer_index = np.sqrt(dd_sq_mean)
|
| | df['rolling_Martin_MartinRatio'] = df['rolling_CAGR_CompoundAnnualGrowthRate'] / ulcer_index.replace(0, np.nan)
|
| |
|
| |
|
| | df['rolling_Sterling_SterlingRatio'] = df['rolling_CAGR_CompoundAnnualGrowthRate'] / max_dd_pct.replace(0, np.nan)
|
| |
|
| |
|
| | burke_denom = np.sqrt(dd_sq_sum)
|
| | df['rolling_Burke_BurkeRatio'] = df['rolling_CAGR_CompoundAnnualGrowthRate'] / burke_denom.replace(0, np.nan)
|
| |
|
| |
|
| |
|
| | ror = np.exp(-2 * roll_mean_ret / roll_var_ret)
|
| | ror = np.where(roll_mean_ret < 0, 1.0, ror)
|
| | df['rolling_RoR_RiskOfRuin'] = ror.clip(0, 1)
|
| |
|
| |
|
| |
|
| | sr = roll_mean_ret / roll_std_ret
|
| |
|
| | dsr_denom_term = 1 - roll_skew * sr + ((roll_kurt + 2) / 4) * (sr**2)
|
| | dsr_denom = np.sqrt(dsr_denom_term.abs())
|
| |
|
| | dsr_stat = (sr * np.sqrt(n_trades - 1)) / dsr_denom.replace(0, np.nan)
|
| | df['rolling_DSR_DeflatedSharpeRatio'] = stats.norm.cdf(dsr_stat)
|
| |
|
| |
|
| | df['rolling_PainIndex_PainIndex'] = dd_pct.expanding().mean()
|
| |
|
| |
|
| | df['rolling_PainRatio_PainRatio'] = df['rolling_CAGR_CompoundAnnualGrowthRate'] / df['rolling_PainIndex_PainIndex'].replace(0, np.nan)
|
| |
|
| |
|
| |
|
| | df['rolling_Lake_LakeRatio'] = dd_dollar.expanding().sum() / profits.cumsum().replace(0, np.nan)
|
| |
|
| |
|
| | df['rolling_OWLR_OutlierWinLossRatio'] = expand_max_win / expand_max_loss.replace(0, np.nan)
|
| |
|
| |
|
| | df['rolling_PI_ProfitabilityIndex'] = expand_wins_sum / expand_losses_sum.replace(0, np.nan)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | metric_cols = [c for c in df.columns if 'rolling_' in c]
|
| | df[metric_cols] = df[metric_cols].replace([np.inf, -np.inf], np.nan)
|
| |
|
| | df.to_csv(output_csv, index=False)
|
| | print(f"Success! Processed data saved to: {output_csv}")
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | input_filename = 'merged_extracted_orders_and_deals.csv'
|
| | output_filename = '5_layer_output.csv'
|
| |
|
| | calculate_rolling_metrics(input_filename, output_filename) |