Spaces:
Sleeping
Sleeping
# utils/time_series_causal.py | |
import pandas as pd | |
from statsmodels.tsa.stattools import grangercausalitytests | |
def perform_granger_causality(data_list, timestamp_col, variables_to_analyze, max_lags=1): | |
""" | |
Performs pairwise Granger Causality tests on the given time-series data. | |
Args: | |
data_list (list of dict): List of dictionaries representing the dataset. | |
timestamp_col (str): Name of the timestamp column. | |
variables_to_analyze (list): List of names of variables to test for causality. | |
max_lags (int): The maximum number of lags to use for the Granger causality test. | |
Returns: | |
list: A list of dictionaries, each describing a causal relationship found. | |
""" | |
df = pd.DataFrame(data_list) | |
if timestamp_col not in df.columns: | |
raise ValueError(f"Timestamp column '{timestamp_col}' not found in data.") | |
# Ensure timestamp column is datetime and set as index | |
try: | |
df[timestamp_col] = pd.to_datetime(df[timestamp_col]) | |
df = df.set_index(timestamp_col).sort_index() | |
except Exception as e: | |
raise ValueError(f"Could not convert timestamp column '{timestamp_col}' to datetime: {e}") | |
# Ensure all variables to analyze are numeric | |
for col in variables_to_analyze: | |
if not pd.api.types.is_numeric_dtype(df[col]): | |
raise ValueError(f"Variable '{col}' is not numeric. Granger Causality requires numeric variables.") | |
if df[col].isnull().any(): | |
# Handle NaNs: Granger Causality tests require no NaN values. | |
# You might choose to drop rows with NaNs or impute. | |
# For simplicity, here we'll raise an error or drop them. | |
# print(f"Warning: Variable '{col}' contains NaN values. Rows with NaNs will be dropped.") | |
df = df.dropna(subset=[col]) | |
# Select only the relevant columns | |
df_selected = df[variables_to_analyze] | |
# Granger Causality requires stationarity in theory. | |
# While statsmodels can run on non-stationary data, results should be interpreted cautiously. | |
# You might want to add differencing logic here (e.g., df.diff().dropna()) | |
# or a warning for the user. | |
# For now, we proceed directly. | |
causal_results = [] | |
# Iterate through all unique pairs of variables | |
for i in range(len(variables_to_analyze)): | |
for j in range(len(variables_to_analyze)): | |
if i == j: | |
continue # Skip self-causation tests | |
cause_var = variables_to_analyze[i] | |
effect_var = variables_to_analyze[j] | |
# Prepare data for grangercausalitytests: [effect_var, cause_var] | |
# grangercausalitytests takes a DataFrame where the first column is the dependent variable (effect) | |
# and the second column is the independent variable (cause) | |
data_for_test = df_selected[[effect_var, cause_var]] | |
if data_for_test.empty or len(data_for_test) <= max_lags: | |
# Not enough data points to perform test with specified lags | |
# This can happen if NaNs were dropped or dataset is too small | |
continue | |
try: | |
# Perform Granger Causality test | |
# The output is a dictionary. The key 'ssr_ftest' (or 'params_ftest') | |
# usually contains the p-value. | |
test_result = grangercausalitytests(data_for_test, max_lags, verbose=False) | |
# Extract p-value for the optimal lag or the test that interests you | |
# Commonly, F-test p-value for the last lag tested is used | |
# test_result is a dictionary where keys are lag numbers | |
# Each lag has a tuple of (test_statistics, p_values). | |
# (F-test, Chi2-test, LR-test, SSR-test) -> [statistic, p-value, df_denom, df_num] | |
# Let's consider the F-test for the last lag as a general indicator | |
last_lag_p_value = test_result[max_lags][0]['ssr_ftest'][1] # F-test p-value | |
causal_results.append({ | |
"cause": cause_var, | |
"effect": effect_var, | |
"p_value": last_lag_p_value, | |
"test_type": "Granger Causality (F-test)", | |
"max_lags": max_lags | |
}) | |
except ValueError as ve: | |
# Handle cases where the test cannot be performed (e.g., singular matrix) | |
print(f"Could not perform Granger Causality for {cause_var} -> {effect_var} with max_lags={max_lags}: {ve}") | |
continue # Skip this pair | |
except Exception as e: | |
print(f"An unexpected error occurred for {cause_var} -> {effect_var}: {e}") | |
continue | |
return causal_results |