Spaces:
Running
Running
IndianMohit
Polishing: Enhanced stochastic volatility with tapering for perfect forecast transitions
eb6d865 | """ | |
| Data processing and feature engineering utilities. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from scipy import stats | |
| def clean_timeseries(df: pd.DataFrame, value_col: str = None) -> pd.DataFrame: | |
| """ | |
| Clean a time series DataFrame: | |
| - Ensure datetime index | |
| - Sort by date | |
| - Handle missing values with interpolation | |
| - Remove duplicates | |
| """ | |
| df = df.copy() | |
| # Ensure datetime index | |
| if not isinstance(df.index, pd.DatetimeIndex): | |
| if "Date" in df.columns: | |
| df["Date"] = pd.to_datetime(df["Date"]) | |
| df = df.set_index("Date") | |
| elif "date" in df.columns: | |
| df["date"] = pd.to_datetime(df["date"]) | |
| df = df.set_index("date") | |
| else: | |
| # Try to parse the first column as dates | |
| try: | |
| df.index = pd.to_datetime(df.index) | |
| except Exception: | |
| pass | |
| # Sort by index | |
| df = df.sort_index() | |
| # Remove duplicate indices | |
| df = df[~df.index.duplicated(keep="first")] | |
| # Interpolate missing values | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns | |
| for col in numeric_cols: | |
| if df[col].isna().sum() > 0: | |
| df[col] = df[col].interpolate(method="time", limit_direction="both") | |
| df[col] = df[col].ffill().bfill() | |
| return df | |
| def compute_returns(df: pd.DataFrame, col: str = "Close", periods: int = 1) -> pd.Series: | |
| """Compute percentage returns.""" | |
| return df[col].pct_change(periods) * 100 | |
| def compute_rolling_stats(df: pd.DataFrame, col: str, windows: list = None) -> pd.DataFrame: | |
| """ | |
| Compute rolling statistics (mean, std) for given windows. | |
| """ | |
| if windows is None: | |
| windows = [7, 14, 30, 90] | |
| result = df[[col]].copy() | |
| for w in windows: | |
| if len(df) >= w: | |
| result[f"MA_{w}"] = df[col].rolling(window=w).mean() | |
| result[f"STD_{w}"] = df[col].rolling(window=w).std() | |
| return result | |
| def compute_technical_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Compute common technical indicators for stock/crypto data. | |
| Expects columns: Open, High, Low, Close, Volume | |
| """ | |
| result = df.copy() | |
| # Moving Averages | |
| for period in [7, 20, 50, 200]: | |
| if len(df) >= period: | |
| result[f"SMA_{period}"] = df["Close"].rolling(window=period).mean() | |
| result[f"EMA_{period}"] = df["Close"].ewm(span=period, adjust=False).mean() | |
| # RSI (Relative Strength Index) | |
| if len(df) >= 14: | |
| delta = df["Close"].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| rs = gain / loss | |
| result["RSI"] = 100 - (100 / (1 + rs)) | |
| # MACD | |
| if len(df) >= 26: | |
| ema12 = df["Close"].ewm(span=12, adjust=False).mean() | |
| ema26 = df["Close"].ewm(span=26, adjust=False).mean() | |
| result["MACD"] = ema12 - ema26 | |
| result["MACD_Signal"] = result["MACD"].ewm(span=9, adjust=False).mean() | |
| result["MACD_Hist"] = result["MACD"] - result["MACD_Signal"] | |
| # Bollinger Bands | |
| if len(df) >= 20: | |
| sma20 = df["Close"].rolling(window=20).mean() | |
| std20 = df["Close"].rolling(window=20).std() | |
| result["BB_Upper"] = sma20 + (std20 * 2) | |
| result["BB_Lower"] = sma20 - (std20 * 2) | |
| result["BB_Middle"] = sma20 | |
| # ATR (Average True Range) | |
| if len(df) >= 14: | |
| high_low = df["High"] - df["Low"] | |
| high_close = (df["High"] - df["Close"].shift()).abs() | |
| low_close = (df["Low"] - df["Close"].shift()).abs() | |
| true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) | |
| result["ATR"] = true_range.rolling(window=14).mean() | |
| # Daily Returns | |
| result["Returns"] = df["Close"].pct_change() * 100 | |
| # Volume Moving Average | |
| if "Volume" in df.columns and len(df) >= 20: | |
| result["Volume_MA20"] = df["Volume"].rolling(window=20).mean() | |
| return result | |
| def decompose_timeseries(series: pd.Series, period: int = None) -> dict: | |
| """ | |
| Decompose time series into trend, seasonal, and residual components. | |
| """ | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| if period is None: | |
| # Auto-detect period | |
| n = len(series) | |
| if n >= 730: | |
| period = 365 | |
| elif n >= 60: | |
| period = 30 | |
| elif n >= 14: | |
| period = 7 | |
| else: | |
| period = max(2, n // 3) | |
| # Ensure no missing values | |
| series = series.dropna() | |
| if len(series) < 2 * period: | |
| period = max(2, len(series) // 3) | |
| result = seasonal_decompose(series, model="additive", period=period) | |
| return { | |
| "observed": result.observed, | |
| "trend": result.trend, | |
| "seasonal": result.seasonal, | |
| "residual": result.resid, | |
| } | |
| def detect_anomalies(series: pd.Series, method: str = "zscore", threshold: float = 3.0) -> pd.Series: | |
| """ | |
| Detect anomalies in a time series. | |
| Returns a boolean Series (True = anomaly). | |
| """ | |
| if method == "zscore": | |
| z_scores = np.abs(stats.zscore(series.dropna())) | |
| anomalies = pd.Series(False, index=series.index) | |
| anomalies[series.dropna().index] = z_scores > threshold | |
| elif method == "iqr": | |
| Q1 = series.quantile(0.25) | |
| Q3 = series.quantile(0.75) | |
| IQR = Q3 - Q1 | |
| anomalies = (series < (Q1 - 1.5 * IQR)) | (series > (Q3 + 1.5 * IQR)) | |
| else: | |
| anomalies = pd.Series(False, index=series.index) | |
| return anomalies | |
| def compute_stationarity_test(series: pd.Series) -> dict: | |
| """ | |
| Perform Augmented Dickey-Fuller test for stationarity. | |
| """ | |
| from statsmodels.tsa.stattools import adfuller | |
| series = series.dropna() | |
| if len(series) < 20: | |
| return {"error": "Not enough data points for stationarity test"} | |
| result = adfuller(series, autolag="AIC") | |
| return { | |
| "test_statistic": round(result[0], 4), | |
| "p_value": round(result[1], 6), | |
| "lags_used": result[2], | |
| "observations": result[3], | |
| "critical_values": {k: round(v, 4) for k, v in result[4].items()}, | |
| "is_stationary": result[1] < 0.05, | |
| } | |
| def prepare_forecast_data(df: pd.DataFrame, target_col: str, train_ratio: float = 0.8) -> tuple: | |
| """ | |
| Split time series into train and test sets. | |
| Returns (train, test) | |
| """ | |
| n = len(df) | |
| split_idx = int(n * train_ratio) | |
| train = df.iloc[:split_idx] | |
| test = df.iloc[split_idx:] | |
| return train, test | |
| def inject_stochastic_volatility(forecast: pd.Series, actual: pd.Series, fitted: pd.Series = None) -> pd.Series: | |
| """ | |
| Inject realistic historical volatility (noise) into a smooth prediction line | |
| to simulate a realistic path based on past patterns. | |
| """ | |
| if fitted is None or len(fitted) < 10: | |
| # Fallback to computing volatility of differenced actuals | |
| diffs = actual.diff().dropna() | |
| std_resid = diffs.std() | |
| else: | |
| # Calculate volatility from model residuals | |
| common_idx = fitted.index.intersection(actual.index) | |
| if len(common_idx) < 10: | |
| diffs = actual.diff().dropna() | |
| std_resid = diffs.std() | |
| else: | |
| residuals = actual[common_idx] - fitted[common_idx] | |
| std_resid = np.nanstd(residuals) | |
| if np.isnan(std_resid) or std_resid == 0: | |
| std_resid = actual.diff().dropna().std() | |
| # Calculate recent market volatility (last 30 days) to ensure the noise is proportional | |
| recent_actual = actual.tail(30) | |
| std_recent = recent_actual.std() if len(recent_actual) > 5 else std_resid | |
| if np.isnan(std_recent) or std_recent == 0: | |
| std_recent = std_resid | |
| # Ensure baseline volatility is noticeable (at least 15% of recent standard deviation) | |
| base_volatility = max(std_resid, std_recent * 0.15) | |
| if np.isnan(base_volatility) or base_volatility == 0: | |
| return forecast | |
| # Generate a mean-reverting Random Walk (AR(1) with high momentum) | |
| # This perfectly simulates real asset price movement (trends that eventually revert to the prediction mean) | |
| n_steps = len(forecast) | |
| noise = np.zeros(n_steps) | |
| # Amplify the scale slightly so the user visually sees the "fluctuation" | |
| noise_scale = base_volatility * 1.2 | |
| np.random.seed(np.random.randint(0, 10000)) # Randomize seed for every run | |
| # 0.85 momentum makes the noise "drift" like a real market swing before returning to the forecast line | |
| for i in range(1, n_steps): | |
| noise[i] = 0.85 * noise[i-1] + np.random.normal(0, noise_scale * 0.5) | |
| # Apply a tapering window (Sigmoid-like) at the start | |
| # This ensures the transition from actual history to forecast is perfectly smooth | |
| taper = np.linspace(0, 1, min(n_steps, 10)) | |
| noise[:len(taper)] *= taper | |
| return forecast + noise | |