Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import statsmodels.api as sm | |
import plotly.graph_objects as go | |
from statsmodels.tsa.stattools import acf, pacf, q_stat | |
from statsmodels.stats.stattools import durbin_watson | |
from statsmodels.stats.diagnostic import acorr_ljungbox | |
import matplotlib.pyplot as plt | |
import warnings | |
warnings.filterwarnings("ignore") | |
def prepare_model(df, y_col, x_cols, cov_type, n_lags): | |
try: | |
df = df.copy() | |
# 1. Subset to selected variables | |
selected_cols = [y_col] + x_cols | |
df_subset = df[selected_cols].copy() | |
# 2. Convert dependent variable to numeric first (before dropping NaNs) | |
df_subset[y_col] = pd.to_numeric(df_subset[y_col], errors='coerce') | |
# 3. Also convert X columns to numeric if possible (this will convert dummies later) | |
for col in x_cols: | |
if df_subset[col].dtype == 'object': | |
# Leave for now, will be dummified later | |
continue | |
else: | |
df_subset[col] = pd.to_numeric(df_subset[col], errors='coerce') | |
# 4. Drop rows with any missing values (in y or x) | |
df_clean = df_subset.dropna() | |
initial_n = len(df) | |
dropped_n = initial_n - len(df_clean) | |
final_n = len(df_clean) | |
if len(x_cols) == 0: | |
raise ValueError("No independent variables selected. Please select at least one X variable.") | |
# 5. Extract Y and X after clean | |
y = df_clean[y_col] | |
X = df_clean[x_cols] | |
# 6. Convert categorical variables to dummies | |
cat_cols = X.select_dtypes(include=['object', 'category']).columns.tolist() | |
dummy_info = "" | |
if cat_cols: | |
dropped_categories = {} | |
new_dummies = [] | |
for cat in cat_cols: | |
df_clean[cat] = df_clean[cat].str.strip() # clean spaces | |
dummies = pd.get_dummies(df_clean[cat], prefix=cat, drop_first=True).astype(float) # force numeric | |
new_dummies.extend(dummies.columns.tolist()) | |
X = X.drop(columns=[cat]) | |
X = pd.concat([X, dummies], axis=1) | |
dummy_info = ( | |
f"Converted categorical columns: {cat_cols}\n" | |
f"Created dummy columns: {new_dummies}\n" | |
f"Dropped baseline categories (due to drop_first=True): {dropped_categories}\n" | |
) | |
# 7. Force all X columns to numeric | |
for col in X.columns: | |
X[col] = pd.to_numeric(X[col], errors='coerce') | |
# 8. Drop any columns with all NaN (rare but possible after conversion) | |
X = X.dropna(axis=1, how='all') | |
# 9. Drop any rows with NaN after dummy conversion (very unlikely now) | |
Xy = pd.concat([y, X], axis=1).dropna() | |
y = Xy[y_col] | |
X = Xy.drop(columns=[y_col]) | |
X = X.apply(pd.to_numeric, errors='coerce') | |
# 10. Final numeric check on X | |
non_numeric_cols = [col for col in X.columns if not np.issubdtype(X[col].dtype, np.number)] | |
if non_numeric_cols: | |
raise ValueError(f"Some independent variables are still non-numeric: {non_numeric_cols}") | |
# 11. Add constant for intercept | |
X = sm.add_constant(X) | |
model = sm.OLS(y, X) | |
results = model.fit() | |
# HAC or HC3 robust covariance | |
if cov_type == 'HC3': | |
robust_results = results.get_robustcov_results(cov_type='HC3') | |
else: | |
maxlags = int(4 * (final_n / 100) ** (2 / 9)) | |
robust_results = results.get_robustcov_results(cov_type='HAC', maxlags=maxlags) | |
# Prepare output summary table and rest of your code unchanged... | |
summary_table = robust_results.summary2().tables[1] | |
summary_table.reset_index(inplace=True) | |
summary_csv_path = f"regression_summary_{cov_type}.csv" | |
summary_table.to_csv(summary_csv_path, index=False) | |
notes = ( | |
f"Covariance Type: {cov_type}\n" | |
f"{dummy_info}" | |
f"Initial rows: {initial_n}\n" | |
f"Rows dropped due to missing values: {dropped_n}\n" | |
f"Final sample size used in regression: {final_n}\n" | |
) | |
residuals = robust_results.resid | |
fitted_vals = robust_results.fittedvalues | |
# Residuals vs Fitted Plot | |
fig_resid_fitted = go.Figure() | |
fig_resid_fitted.add_trace(go.Scatter( | |
x=fitted_vals, | |
y=residuals, | |
mode='markers', | |
marker=dict(color='blue', opacity=0.6), | |
name="Residuals" | |
)) | |
fig_resid_fitted.update_layout( | |
title="Residuals vs fitted values", | |
xaxis_title="Fitted values", | |
yaxis_title="Residuals", | |
showlegend=False | |
) | |
# Histogram of Residuals | |
fig_resid_hist = go.Figure() | |
fig_resid_hist.add_trace(go.Histogram( | |
x=residuals, | |
nbinsx=30, | |
marker_color='skyblue' | |
)) | |
fig_resid_hist.update_layout( | |
title="Residuals", | |
xaxis_title="Residual", | |
yaxis_title="Count" | |
) | |
# Compute ACF & PACF | |
acf_vals = acf(residuals, nlags=n_lags) | |
pacf_vals = pacf(residuals, nlags=n_lags, method='ywm') | |
lags_range = np.arange(1, n_lags + 1) | |
acf_vals = acf_vals[1:] | |
pacf_vals = pacf_vals[1:] | |
ci = 1.96 / np.sqrt(final_n) | |
# Plotly ACF | |
fig_acf = go.Figure() | |
fig_acf.add_trace(go.Bar(x=lags_range, y=acf_vals, name="ACF")) | |
fig_acf.add_trace(go.Scatter(x=lags_range, y=[ci]*n_lags, mode='lines', name="+95%", line=dict(dash="dash", color="gray"))) | |
fig_acf.add_trace(go.Scatter(x=lags_range, y=[-ci]*n_lags, mode='lines', name="-95%", line=dict(dash="dash", color="gray"))) | |
fig_acf.update_layout(title="ACF (Lags 1–{})".format(n_lags), xaxis_title="Lag", yaxis_title="ACF", showlegend=True) | |
# Plotly PACF | |
fig_pacf = go.Figure() | |
fig_pacf.add_trace(go.Bar(x=lags_range, y=pacf_vals, name="PACF", marker_color="orange")) | |
fig_pacf.add_trace(go.Scatter(x=lags_range, y=[ci]*n_lags, mode='lines', name="+95%", line=dict(dash="dash", color="gray"))) | |
fig_pacf.add_trace(go.Scatter(x=lags_range, y=[-ci]*n_lags, mode='lines', name="-95%", line=dict(dash="dash", color="gray"))) | |
fig_pacf.update_layout(title="PACF (Lags 1–{})".format(n_lags), xaxis_title="Lag", yaxis_title="PACF", showlegend=True) | |
# Save PACF plot | |
#pacf_path = "pacf_plot.png" | |
#fig_pacf.write_image(pacf_path) | |
# Ljung-Box test | |
lb_test = acorr_ljungbox(residuals, lags=[n_lags], return_df=True) | |
lb_stat = lb_test.iloc[0, 0] | |
lb_pval = lb_test.iloc[0, 1] | |
ljung_box_msg = f"Ljung-Box Q-stat (lag {n_lags}): {lb_stat:.4f}, p-value: {lb_pval:.4f}" | |
# Durbin-Watson test for autocorrelation | |
dw_stat = durbin_watson(robust_results.resid) | |
dw_msg = f"Durbin-Watson stat: {dw_stat:.4f}" | |
r2_msg = f"R-squared: {results.rsquared:.4f}" | |
adj_r2_msg = f"Adjusted R-squared: {results.rsquared_adj:.4f}" | |
# Markdown-style summary table | |
output_table = summary_table.to_markdown(index=False) | |
final_output = ( | |
notes | |
+ "\n" + ljung_box_msg | |
+ "\n" + dw_msg | |
+ "\n" + r2_msg | |
+ "\n" + adj_r2_msg | |
+ "\n\n" + output_table | |
+ "\n\n" + dummy_info #full_summary_text | |
) | |
return final_output, fig_acf, fig_pacf, summary_csv_path, fig_resid_fitted, fig_resid_hist | |
except Exception as e: | |
return f"Error: {str(e)}", None, None, None, None, None | |
############# | |
# Helper # | |
############# | |
def get_column_options(file): | |
try: | |
df = pd.read_csv(file.name) | |
columns = df.columns.tolist() | |
return gr.update(choices=columns), gr.update(choices=columns), df | |
except Exception as e: | |
return gr.update(choices=[]), gr.update(choices=[]), None | |
############# | |
# ris file # | |
############# | |
ris_content = """TY - COMP | |
T1 - Robust OLS regression with HAC/HC3 and autocorrelation diagnostics | |
AU - Mat Roni, S. | |
PY - 2025 | |
VL - 1.0 | |
PB - Hugging Face | |
UR - https://huggingface.co/spaces/pvaluedotone/robust_ols | |
ER - | |
""" | |
with open("citation.ris", "w") as f: | |
f.write(ris_content) | |
############# | |
# Gradio UI # | |
############# | |
with gr.Blocks() as app: | |
gr.Markdown("## Robust OLS regression with HAC/HC3 and autocorrelation diagnostics") | |
gr.Markdown("**Citation:** Mat Roni, S. (2025). *Robust OLS regression with HAC/HC3 and autocorrelation diagnostics* (version 1.0) [software]. [https://huggingface.co/spaces/pvaluedotone/robust_ols](https://huggingface.co/spaces/pvaluedotone/robust_ols)") | |
gr.File(value="citation.ris", label="Download citation (.ris)", interactive=False) | |
with gr.Row(): | |
file_input = gr.File(label="Upload CSV") | |
cov_type = gr.Radio(label="Covariance type", choices=["HC3", "HAC"], value="HC3") | |
lag_input = gr.Slider(label="Lags for ACF/PACF & Ljung-Box", minimum=5, maximum=40, step=1, value=20) | |
with gr.Row(): | |
y_dropdown = gr.Dropdown(label="Select y (dependent variable)", choices=[]) | |
x_checkboxes = gr.CheckboxGroup(label="Select x (independent variables)", choices=[]) | |
run_button = gr.Button("Run Regression") | |
#output_text = gr.Textbox(label="Regression Output", lines=50) | |
output_text = gr.Code(label="Regression output", language="python") | |
acf_plot = gr.Plot(label="ACF plot (interactive)") | |
pacf_plot = gr.Plot(label="PACF plot (interactive)") | |
resid_plot = gr.Plot(label="Residuals vs Fitted values") | |
resid_hist = gr.Plot(label="Residuals") | |
download_summary = gr.File(label="Download Regression Summary (CSV)") | |
file_state = gr.State() | |
file_input.change( | |
get_column_options, | |
inputs=file_input, | |
outputs=[y_dropdown, x_checkboxes, file_state] | |
) | |
run_button.click( | |
prepare_model, | |
inputs=[file_state, y_dropdown, x_checkboxes, cov_type, lag_input], | |
outputs=[ | |
output_text, | |
acf_plot, | |
pacf_plot, | |
download_summary, | |
resid_plot, | |
resid_hist | |
] | |
) | |
app.launch() | |