| """Robustness-focused factor execution engine. |
| |
| Loads daily_pv.h5 into memory, parses factor expressions via expr_parser, |
| executes them using function_lib operators, and computes IC/IR metrics. |
| |
| Supports temporal data splits for proper train/val/test evaluation: |
| - Factor computation uses ALL data (needs lookback for TS operators) |
| - IC/IR metrics are computed only on the specified period's date range |
| |
| This module is intentionally independent from the stable production path in |
| `backtest/factor_executor.py`. Use it for robustness experiments where we want |
| to vary execution or portfolio construction assumptions without touching the |
| current baseline backtest flow. |
| """ |
|
|
| import os |
| import sys |
| import time |
| import traceback |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from expression_manager.expr_parser import parse_expression, parse_symbol |
| from expression_manager import function_lib |
|
|
| |
| DATA_PATH = Path(__file__).resolve().parent / "data" / "daily_pv.h5" |
|
|
| |
| _PYTHON_KEYWORDS = {'return', 'class', 'import', 'from', 'def', 'if', 'else', |
| 'for', 'while', 'try', 'except', 'with', 'as', 'in', 'is', |
| 'not', 'and', 'or', 'pass', 'break', 'continue', 'yield', |
| 'lambda', 'global', 'nonlocal', 'del', 'raise', 'assert'} |
|
|
| |
| _cached_df: pd.DataFrame | None = None |
| _cached_data_path: Path | None = None |
| _cached_columns: dict[str, pd.Series] = {} |
| _cached_backtest_price_df: pd.DataFrame | None = None |
| _cached_bench_return: pd.Series | None = None |
|
|
| |
| |
| _period_ranges: dict[str, tuple[str, str]] = {} |
|
|
| |
| _period_masks: dict[str, np.ndarray] = {} |
|
|
| _SCORE_TRANSFORM_ALIASES = { |
| "": "identity", |
| "identity": "identity", |
| "none": "identity", |
| "raw": "identity", |
| "rank": "rank", |
| "pct_rank": "rank", |
| "zscore": "zscore", |
| "rank_zscore": "rank_zscore", |
| "signed": "signed", |
| "sign": "signed", |
| "clip_zscore": "clip_zscore", |
| } |
|
|
| _UNIVERSE_FILTER_ALIASES = { |
| "": "none", |
| "none": "none", |
| "all": "none", |
| "top_amount": "top_amount", |
| "amount": "top_amount", |
| "top_volume": "top_volume", |
| "volume": "top_volume", |
| } |
|
|
|
|
| def configure_periods(periods: dict[str, dict[str, str]]) -> None: |
| """Configure temporal date ranges for train/val/test splits. |
| |
| Args: |
| periods: dict like {"train": {"start": "2016-01-01", "end": "2020-12-31"}, |
| "val": {"start": "2021-01-01", "end": "2021-12-31"}, |
| "test": {"start": "2022-01-01", "end": "2026-12-31"}} |
| """ |
| global _period_ranges, _period_masks |
| _period_ranges.clear() |
| _period_masks.clear() |
| for name, cfg in periods.items(): |
| _period_ranges[name] = (cfg["start"], cfg["end"]) |
| print(f"[FactorExecutor] Configured periods: {_period_ranges}") |
|
|
| |
| if _cached_df is not None: |
| _build_period_masks() |
|
|
|
|
| def _build_period_masks() -> None: |
| """Build boolean masks for each configured period.""" |
| global _period_masks |
| _period_masks.clear() |
| if _cached_df is None or not _period_ranges: |
| return |
| dates = _cached_df.index.get_level_values("datetime") |
| for name, (start, end) in _period_ranges.items(): |
| mask = (dates >= pd.Timestamp(start)) & (dates <= pd.Timestamp(end)) |
| mask_arr = mask.values if hasattr(mask, 'values') else np.asarray(mask) |
| _period_masks[name] = mask_arr |
| n_rows = mask_arr.sum() |
| n_days = dates[mask_arr].nunique() |
| print(f"[FactorExecutor] Period '{name}': {start} to {end} — {n_days} days, {n_rows} rows") |
|
|
|
|
| def _normalize_score_transform(mode: str | None) -> str: |
| key = str(mode or "").strip().lower() |
| if key not in _SCORE_TRANSFORM_ALIASES: |
| choices = ", ".join(sorted(set(_SCORE_TRANSFORM_ALIASES.values()))) |
| raise ValueError(f"Unsupported score_transform={mode!r}. Choose one of: {choices}") |
| return _SCORE_TRANSFORM_ALIASES[key] |
|
|
|
|
| def _normalize_universe_filter(mode: str | None) -> str: |
| key = str(mode or "").strip().lower() |
| if key not in _UNIVERSE_FILTER_ALIASES: |
| choices = ", ".join(sorted(set(_UNIVERSE_FILTER_ALIASES.values()))) |
| raise ValueError(f"Unsupported universe_filter={mode!r}. Choose one of: {choices}") |
| return _UNIVERSE_FILTER_ALIASES[key] |
|
|
|
|
| def _cross_sectional_zscore(series: pd.Series) -> pd.Series: |
| grouped = series.groupby(level="datetime") |
| mean = grouped.transform("mean") |
| std = grouped.transform("std").replace(0.0, np.nan) |
| z = (series - mean) / std |
| return z.replace([np.inf, -np.inf], np.nan).fillna(0.0) |
|
|
|
|
| def _apply_score_transform( |
| factor_values: pd.Series, |
| *, |
| score_transform: str, |
| score_clip: float | None = None, |
| ) -> pd.Series: |
| mode = _normalize_score_transform(score_transform) |
| out = pd.to_numeric(factor_values, errors="coerce").astype(float) |
| if mode == "identity": |
| return out |
| if mode == "rank": |
| return out.groupby(level="datetime").rank(method="average", pct=True) |
| if mode == "zscore": |
| return _cross_sectional_zscore(out) |
| if mode == "rank_zscore": |
| ranked = out.groupby(level="datetime").rank(method="average", pct=True) |
| return _cross_sectional_zscore(ranked) |
| if mode == "signed": |
| return np.sign(out).astype(float) |
| if mode == "clip_zscore": |
| z = _cross_sectional_zscore(out) |
| clip = float(score_clip) if score_clip is not None and np.isfinite(score_clip) else 3.0 |
| clip = max(clip, 0.0) |
| return z.clip(-clip, clip) |
| return out |
|
|
|
|
| def _build_liquidity_universe_mask( |
| liquidity_series: pd.Series, |
| *, |
| top_n: int, |
| lookback_days: int, |
| ) -> pd.Series: |
| if top_n <= 0: |
| return pd.Series(True, index=liquidity_series.index) |
| wide = liquidity_series.unstack("instrument").sort_index() |
| lookback = max(int(lookback_days), 1) |
| rolling_metric = wide.rolling(lookback, min_periods=1).mean().shift(1) |
| ranks = rolling_metric.rank(axis=1, method="first", ascending=False) |
| keep = ranks <= int(top_n) |
| mask = keep.stack(dropna=False) |
| mask.index.names = liquidity_series.index.names |
| return mask.reindex(liquidity_series.index).fillna(False).astype(bool) |
|
|
|
|
| def _apply_universe_filter( |
| factor_values: pd.Series, |
| *, |
| universe_filter: str, |
| universe_top_n: int, |
| universe_lookback_days: int, |
| ) -> pd.Series: |
| mode = _normalize_universe_filter(universe_filter) |
| if mode == "none" or universe_top_n <= 0: |
| return factor_values |
|
|
| if mode == "top_amount": |
| liquidity_series = _cached_columns.get("$amount") |
| else: |
| liquidity_series = _cached_columns.get("$volume") |
|
|
| if liquidity_series is None: |
| raise ValueError( |
| f"Universe filter {mode!r} requires {'$amount' if mode == 'top_amount' else '$volume'} in the dataset." |
| ) |
|
|
| mask = _build_liquidity_universe_mask( |
| pd.to_numeric(liquidity_series, errors="coerce").astype(float), |
| top_n=int(universe_top_n), |
| lookback_days=int(universe_lookback_days), |
| ) |
| return factor_values.where(mask.reindex(factor_values.index).fillna(False)) |
|
|
|
|
| def _resolve_data_path(data_path: str | Path | None = None) -> Path: |
| if data_path: |
| return Path(data_path).expanduser().resolve() |
|
|
| for env_name in ("ALPHAEVO_DATA_PATH", "AAE_DATA_PATH", "DAILY_PV_PATH"): |
| env_value = os.environ.get(env_name, "").strip() |
| if env_value: |
| return Path(env_value).expanduser().resolve() |
|
|
| return DATA_PATH.resolve() |
|
|
|
|
| def _reset_cached_data() -> None: |
| global _cached_df, _cached_data_path, _cached_columns, _cached_backtest_price_df, _cached_bench_return |
| _cached_df = None |
| _cached_data_path = None |
| _cached_columns = {} |
| _cached_backtest_price_df = None |
| _cached_bench_return = None |
|
|
|
|
| def load_data(data_path: str | Path | None = None) -> pd.DataFrame: |
| """Load daily_pv.h5 into memory (cached on first call).""" |
| global _cached_df, _cached_data_path, _cached_columns, _cached_backtest_price_df, _cached_bench_return |
| path = _resolve_data_path(data_path) |
|
|
| if _cached_df is not None and _cached_data_path is not None and _cached_data_path != path: |
| print(f"[FactorExecutor] Data path changed: {_cached_data_path} -> {path}; reloading cache") |
| _reset_cached_data() |
|
|
| if _cached_df is not None: |
| if _cached_backtest_price_df is None: |
| _cached_backtest_price_df = _cached_df[[col for col in ('$open', '$close', '$volume', '$amount') if col in _cached_df.columns]] |
| if _cached_bench_return is None and '$bench_return' in _cached_columns: |
| _cached_bench_return = _cached_columns['$bench_return'].groupby('datetime').first() |
| return _cached_df |
|
|
| print(f"[FactorExecutor] Loading data from {path}...") |
| _cached_df = pd.read_hdf(str(path)) |
| _cached_data_path = path |
|
|
| |
| for col in _cached_df.columns: |
| _cached_columns[col] = _cached_df[col] |
| _cached_backtest_price_df = _cached_df[[col for col in ('$open', '$close', '$volume', '$amount') if col in _cached_df.columns]] |
| if '$bench_return' in _cached_columns: |
| _cached_bench_return = _cached_columns['$bench_return'].groupby('datetime').first() |
|
|
| print(f"[FactorExecutor] Loaded {_cached_df.shape[0]} rows, " |
| f"{len(_cached_df.columns)} columns, " |
| f"instruments: {_cached_df.index.get_level_values('instrument').nunique()}") |
|
|
| |
| if _period_ranges: |
| _build_period_masks() |
|
|
| return _cached_df |
|
|
|
|
| def execute_expression( |
| factor_expr: str, |
| data_path: str | Path | None = None, |
| period: str | None = None, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| label_forward_days: int = 5, |
| top_k: int = 10, |
| n_drop: int = 2, |
| start_cash: float = 200_000_000.0, |
| position_size: float = 1.0, |
| max_pos_each_stock: float = 1.0, |
| lot_size: int = 100, |
| max_daily_volume_participation: float = 0.0, |
| max_daily_amount_participation: float = 0.0, |
| rebalance_freq: int = 5, |
| cost_buy: float = 0.0013, |
| cost_sell: float = 0.0013, |
| backtest_engine: str = "custom", |
| capture_details: bool = False, |
| trade_guard_config: dict | bool | None = None, |
| rebalance_mode: str = "dropout", |
| custom_weight_mode: str = "equal", |
| redistribute_unfilled_cash: bool = False, |
| enforce_cash_limit: bool = False, |
| score_transform: str = "identity", |
| score_clip: float = 3.0, |
| universe_filter: str = "none", |
| universe_top_n: int = 0, |
| universe_lookback_days: int = 20, |
| ) -> dict: |
| """Execute a factor expression and compute IC/IR metrics. |
| |
| Args: |
| factor_expr: Factor expression string, e.g. "RANK($close)" |
| data_path: Optional path to daily_pv.h5 |
| period: Optional period name ("train", "val", "test"). If set, |
| IC/IR is computed only on that period's date range. |
| Factor values are still computed on ALL data (TS operators |
| need lookback). If None, uses all data. |
| start_date: Optional explicit evaluation start date override |
| end_date: Optional explicit evaluation end date override |
| label_forward_days: Forward horizon for IC / RankIC computation |
| capture_details: If True, return yearly metrics plus trade/stock detail artifacts |
| trade_guard_config: Optional qlib_original trade guard config. None disables guards. |
| Ignored by custom/spec engines. |
| rebalance_mode: qlib_original rebalance mode ("dropout", "sell_all", or "target_weight"). |
| Ignored by custom/spec engines. |
| custom_weight_mode: custom engine buy sizing mode ("equal" or "alpha_score"). |
| Ignored by qlib_original/spec engines. |
| redistribute_unfilled_cash: When True, custom engine carries an unfilled buy budget |
| down the remaining ranked names within the selected top-k. Ignored by |
| qlib_original/spec engines. |
| enforce_cash_limit: When False, custom engine allows buy-side cash to go negative |
| instead of clipping orders by available cash. Ignored by qlib_original/spec engines. |
| score_transform: Optional cross-sectional transform applied to raw scores. |
| score_clip: Clip threshold used by score_transform="clip_zscore". |
| universe_filter: Optional liquidity-derived universe filter. |
| universe_top_n: Keep top-N names after applying universe_filter. |
| universe_lookback_days: Rolling lookback used by liquidity-derived universes. |
| |
| Returns: |
| dict with keys: success, score (IR), ic_mean, ic_std, ir, error, exec_time |
| """ |
| start_time = time.time() |
|
|
| try: |
| |
| df = load_data(data_path) |
|
|
| |
| parsed_code = parse_expression(factor_expr) |
|
|
| |
| |
| columns = [col for col in df.columns] |
| safe_name_map = {} |
| for col in columns: |
| clean = col.replace('$', '') |
| if clean in _PYTHON_KEYWORDS: |
| safe_name_map[col] = f'col_{clean}' |
| else: |
| safe_name_map[col] = clean |
|
|
| |
| executable_code = parse_symbol(parsed_code, columns) |
|
|
| |
| for col in columns: |
| clean = col.replace('$', '') |
| safe = safe_name_map[col] |
| if clean != safe: |
| |
| |
| import re as _re |
| executable_code = _re.sub( |
| r'\b' + _re.escape(clean) + r'\b', |
| safe, |
| executable_code |
| ) |
|
|
| |
| exec_namespace = {} |
|
|
| |
| for name in dir(function_lib): |
| obj = getattr(function_lib, name) |
| if callable(obj) and not name.startswith('_'): |
| exec_namespace[name] = obj |
|
|
| |
| exec_namespace['np'] = np |
| exec_namespace['pd'] = pd |
|
|
| |
| for col in columns: |
| safe_name = safe_name_map[col] |
| exec_namespace[safe_name] = _cached_columns[col].copy() |
|
|
| |
| factor_values = eval(executable_code, exec_namespace) |
|
|
| |
| if isinstance(factor_values, pd.DataFrame): |
| factor_values = factor_values.iloc[:, 0] |
| elif isinstance(factor_values, np.ndarray): |
| factor_values = pd.Series(factor_values, index=df.index) |
|
|
| |
| from backtest.robust_qlib_backtester import build_signal_selection_log, compute_portfolio_ir |
|
|
| if not isinstance(factor_values, pd.Series): |
| factor_values = pd.Series(factor_values, index=df.index) |
| factor_values = _apply_score_transform( |
| factor_values, |
| score_transform=score_transform, |
| score_clip=score_clip, |
| ) |
| factor_values = _apply_universe_filter( |
| factor_values, |
| universe_filter=universe_filter, |
| universe_top_n=universe_top_n, |
| universe_lookback_days=universe_lookback_days, |
| ) |
|
|
| |
| eval_start_date = start_date |
| eval_end_date = end_date |
| if (eval_start_date is None or eval_end_date is None) and period and period in _period_ranges: |
| period_start, period_end = _period_ranges[period] |
| eval_start_date = eval_start_date or period_start |
| eval_end_date = eval_end_date or period_end |
|
|
| |
| bench_return = _cached_bench_return |
|
|
| result = compute_portfolio_ir( |
| factor_values=factor_values, |
| price_df=_cached_backtest_price_df if _cached_backtest_price_df is not None else df[[col for col in ('$open', '$close', '$volume', '$amount') if col in df.columns]], |
| bench_return=bench_return, |
| top_k=top_k, |
| n_drop=n_drop, |
| rebalance_freq=rebalance_freq, |
| cost_buy=cost_buy, |
| cost_sell=cost_sell, |
| hold_thresh=2, |
| label_forward_days=label_forward_days, |
| ann_scaler=252, |
| start_date=eval_start_date, |
| end_date=eval_end_date, |
| start_cash=start_cash, |
| position_size=position_size, |
| max_pos_each_stock=max_pos_each_stock, |
| lot_size=lot_size, |
| max_daily_volume_participation=max_daily_volume_participation, |
| max_daily_amount_participation=max_daily_amount_participation, |
| capture_details=capture_details, |
| engine=backtest_engine, |
| trade_guard_config=trade_guard_config, |
| rebalance_mode=rebalance_mode, |
| custom_weight_mode=custom_weight_mode, |
| redistribute_unfilled_cash=redistribute_unfilled_cash, |
| enforce_cash_limit=enforce_cash_limit, |
| ) |
|
|
| signal_selection_log = [] |
| if capture_details and result.get('success'): |
| try: |
| signal_selection_log = build_signal_selection_log( |
| factor_values=factor_values, |
| price_df=_cached_backtest_price_df if _cached_backtest_price_df is not None else df[[col for col in ('$open', '$close', '$volume', '$amount') if col in df.columns]], |
| top_k=top_k, |
| start_date=eval_start_date, |
| end_date=eval_end_date, |
| backtest_engine=backtest_engine, |
| trade_guard_config=trade_guard_config, |
| holding_log=result.get('holding_log', []), |
| trade_log=result.get('trade_log', []), |
| portfolio_log=result.get('portfolio_log', []), |
| ) |
| except Exception: |
| signal_selection_log = [] |
|
|
| exec_time = time.time() - start_time |
|
|
| return { |
| 'success': result['success'], |
| 'score': float(result['ir']), |
| 'ic_mean': float(result.get('ic_mean', 0.0)), |
| 'ic_std': float(result.get('ic_std', 0.0)), |
| 'icir': float(result.get('icir', 0.0)), |
| 'ir': float(result['ir']), |
| 'rank_ic_mean': float(result.get('rank_ic_mean', 0.0)), |
| 'rank_ic_std': float(result.get('rank_ic_std', 0.0)), |
| 'rank_icir': float(result.get('rank_icir', 0.0)), |
| 'annualized_return': float(result.get('annualized_return', 0.0)), |
| 'annualized_volatility': float(result.get('annualized_volatility', 0.0)), |
| 'performance_return': float(result.get('performance_return', 0.0)), |
| 'benchmark_performance_return': float(result.get('benchmark_performance_return', 0.0)), |
| 'excess_compounded_return': float(result.get('excess_compounded_return', 0.0)), |
| 'sharpe': float(result.get('sharpe', 0.0)), |
| 'winrate': float(result.get('winrate', 0.0)), |
| 'mdd': float(result.get('mdd', 0.0)), |
| 'excess_mdd': float(result.get('excess_mdd', result.get('mdd', 0.0))), |
| 'portfolio_nav_mdd': float(result.get('portfolio_nav_mdd', 0.0)), |
| 'drawdown_duration_max': int(result.get('drawdown_duration_max', 0) or 0), |
| 'drawdown_duration_mean': float(result.get('drawdown_duration_mean', 0.0)), |
| 'drawdown_duration_median': float(result.get('drawdown_duration_median', 0.0)), |
| 'total_return': float(result.get('total_return', 0.0)), |
| 'final_value': float(result.get('final_value', 0.0)), |
| 'n_days': result.get('n_days', 0), |
| 'n_ic_days': result.get('n_ic_days', 0), |
| 'yearly_metrics': result.get('yearly_metrics', {}), |
| 'trade_log': result.get('trade_log', []), |
| 'stock_contrib': result.get('stock_contrib', []), |
| 'holding_log': result.get('holding_log', []), |
| 'portfolio_log': result.get('portfolio_log', []), |
| 'signal_selection_log': signal_selection_log, |
| 'backtest_engine': backtest_engine, |
| 'label_forward_days': label_forward_days, |
| 'qlib_warnings': result.get('qlib_warnings', []), |
| 'trade_guard_config': result.get('trade_guard_config'), |
| 'rebalance_mode': result.get('rebalance_mode', rebalance_mode), |
| 'rebalance_freq': int(rebalance_freq), |
| 'top_k': int(top_k), |
| 'n_drop': int(n_drop), |
| 'custom_weight_mode': result.get('custom_weight_mode', custom_weight_mode), |
| 'redistribute_unfilled_cash': bool(result.get('redistribute_unfilled_cash', redistribute_unfilled_cash)), |
| 'enforce_cash_limit': bool(result.get('enforce_cash_limit', enforce_cash_limit)), |
| 'cost_buy': float(cost_buy), |
| 'cost_sell': float(cost_sell), |
| 'score_transform': _normalize_score_transform(score_transform), |
| 'score_clip': float(score_clip), |
| 'universe_filter': _normalize_universe_filter(universe_filter), |
| 'universe_top_n': int(universe_top_n), |
| 'universe_lookback_days': int(universe_lookback_days), |
| 'transaction_cost': float(result.get('transaction_cost', 0.0)), |
| 'gross_turnover': float(result.get('gross_turnover', 0.0)), |
| 'turnover_ratio': float(result.get('turnover_ratio', 0.0)), |
| 'error': result.get('error'), |
| 'exec_time': round(exec_time, 3), |
| } |
|
|
| except Exception as e: |
| exec_time = time.time() - start_time |
| return { |
| 'success': False, |
| 'score': 0.0, |
| 'ic_mean': 0.0, |
| 'ic_std': 0.0, |
| 'icir': 0.0, |
| 'ir': 0.0, |
| 'rank_ic_mean': 0.0, |
| 'rank_ic_std': 0.0, |
| 'rank_icir': 0.0, |
| 'annualized_return': 0.0, |
| 'annualized_volatility': 0.0, |
| 'performance_return': 0.0, |
| 'benchmark_performance_return': 0.0, |
| 'excess_compounded_return': 0.0, |
| 'sharpe': 0.0, |
| 'winrate': 0.0, |
| 'mdd': 0.0, |
| 'excess_mdd': 0.0, |
| 'portfolio_nav_mdd': 0.0, |
| 'drawdown_duration_max': 0, |
| 'drawdown_duration_mean': 0.0, |
| 'drawdown_duration_median': 0.0, |
| 'total_return': 0.0, |
| 'final_value': 0.0, |
| 'n_days': 0, |
| 'n_ic_days': 0, |
| 'yearly_metrics': {}, |
| 'trade_log': [], |
| 'stock_contrib': [], |
| 'holding_log': [], |
| 'portfolio_log': [], |
| 'signal_selection_log': [], |
| 'backtest_engine': backtest_engine, |
| 'label_forward_days': label_forward_days, |
| 'qlib_warnings': [], |
| 'trade_guard_config': None, |
| 'rebalance_mode': rebalance_mode, |
| 'rebalance_freq': int(rebalance_freq), |
| 'top_k': int(top_k), |
| 'n_drop': int(n_drop), |
| 'custom_weight_mode': custom_weight_mode, |
| 'redistribute_unfilled_cash': bool(redistribute_unfilled_cash), |
| 'enforce_cash_limit': bool(enforce_cash_limit), |
| 'cost_buy': float(cost_buy), |
| 'cost_sell': float(cost_sell), |
| 'score_transform': _normalize_score_transform(score_transform), |
| 'score_clip': float(score_clip), |
| 'universe_filter': _normalize_universe_filter(universe_filter), |
| 'universe_top_n': int(universe_top_n), |
| 'universe_lookback_days': int(universe_lookback_days), |
| 'transaction_cost': 0.0, |
| 'gross_turnover': 0.0, |
| 'turnover_ratio': 0.0, |
| 'error': f"{type(e).__name__}: {str(e)[:500]}", |
| 'exec_time': round(exec_time, 3), |
| } |
|
|
|
|
| if __name__ == "__main__": |
| |
| test_exprs = [ |
| "RANK($close)", |
| "RANK(TS_MEAN($return, 5) / (TS_STD($return, 20) + 1e-8)) * SIGN(DELTA($close, 5) / ($close + 1e-8))", |
| "TS_CORR(RANK($volume), RANK($close), 10) - TS_CORR(RANK($volume), RANK($close), 40)", |
| ] |
| for expr in test_exprs: |
| print(f"\nExpression: {expr}") |
| result = execute_expression(expr) |
| print(f"Result: {result}") |
|
|