CausalBox / utils /time_series_causal.py
ShutterStack's picture
major changes
ab66d4e verified
# utils/time_series_causal.py
import pandas as pd
from statsmodels.tsa.stattools import grangercausalitytests
def perform_granger_causality(data_list, timestamp_col, variables_to_analyze, max_lags=1):
"""
Performs pairwise Granger Causality tests on the given time-series data.
Args:
data_list (list of dict): List of dictionaries representing the dataset.
timestamp_col (str): Name of the timestamp column.
variables_to_analyze (list): List of names of variables to test for causality.
max_lags (int): The maximum number of lags to use for the Granger causality test.
Returns:
list: A list of dictionaries, each describing a causal relationship found.
"""
df = pd.DataFrame(data_list)
if timestamp_col not in df.columns:
raise ValueError(f"Timestamp column '{timestamp_col}' not found in data.")
# Ensure timestamp column is datetime and set as index
try:
df[timestamp_col] = pd.to_datetime(df[timestamp_col])
df = df.set_index(timestamp_col).sort_index()
except Exception as e:
raise ValueError(f"Could not convert timestamp column '{timestamp_col}' to datetime: {e}")
# Ensure all variables to analyze are numeric
for col in variables_to_analyze:
if not pd.api.types.is_numeric_dtype(df[col]):
raise ValueError(f"Variable '{col}' is not numeric. Granger Causality requires numeric variables.")
if df[col].isnull().any():
# Handle NaNs: Granger Causality tests require no NaN values.
# You might choose to drop rows with NaNs or impute.
# For simplicity, here we'll raise an error or drop them.
# print(f"Warning: Variable '{col}' contains NaN values. Rows with NaNs will be dropped.")
df = df.dropna(subset=[col])
# Select only the relevant columns
df_selected = df[variables_to_analyze]
# Granger Causality requires stationarity in theory.
# While statsmodels can run on non-stationary data, results should be interpreted cautiously.
# You might want to add differencing logic here (e.g., df.diff().dropna())
# or a warning for the user.
# For now, we proceed directly.
causal_results = []
# Iterate through all unique pairs of variables
for i in range(len(variables_to_analyze)):
for j in range(len(variables_to_analyze)):
if i == j:
continue # Skip self-causation tests
cause_var = variables_to_analyze[i]
effect_var = variables_to_analyze[j]
# Prepare data for grangercausalitytests: [effect_var, cause_var]
# grangercausalitytests takes a DataFrame where the first column is the dependent variable (effect)
# and the second column is the independent variable (cause)
data_for_test = df_selected[[effect_var, cause_var]]
if data_for_test.empty or len(data_for_test) <= max_lags:
# Not enough data points to perform test with specified lags
# This can happen if NaNs were dropped or dataset is too small
continue
try:
# Perform Granger Causality test
# The output is a dictionary. The key 'ssr_ftest' (or 'params_ftest')
# usually contains the p-value.
test_result = grangercausalitytests(data_for_test, max_lags, verbose=False)
# Extract p-value for the optimal lag or the test that interests you
# Commonly, F-test p-value for the last lag tested is used
# test_result is a dictionary where keys are lag numbers
# Each lag has a tuple of (test_statistics, p_values).
# (F-test, Chi2-test, LR-test, SSR-test) -> [statistic, p-value, df_denom, df_num]
# Let's consider the F-test for the last lag as a general indicator
last_lag_p_value = test_result[max_lags][0]['ssr_ftest'][1] # F-test p-value
causal_results.append({
"cause": cause_var,
"effect": effect_var,
"p_value": last_lag_p_value,
"test_type": "Granger Causality (F-test)",
"max_lags": max_lags
})
except ValueError as ve:
# Handle cases where the test cannot be performed (e.g., singular matrix)
print(f"Could not perform Granger Causality for {cause_var} -> {effect_var} with max_lags={max_lags}: {ve}")
continue # Skip this pair
except Exception as e:
print(f"An unexpected error occurred for {cause_var} -> {effect_var}: {e}")
continue
return causal_results