Spaces:
Sleeping
Sleeping
File size: 18,361 Bytes
22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 41a3aaf 22e5b2e 85178a1 22e5b2e 85178a1 41a3aaf 85178a1 41a3aaf 22e5b2e d86885c 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e 85178a1 22e5b2e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# ======================== (A) 导入库和配置环境 ========================
import pandas as pd
import numpy as np
import time
import warnings
import os
import logging
import tempfile
import traceback
# 数据分析与建模
from scipy import stats
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import STL
from statsmodels.stats.diagnostic import acorr_ljungbox
from prophet import Prophet
import pmdarima as pm
from sklearn.metrics import mean_absolute_error, mean_squared_error
from joblib import Parallel, delayed
# 可视化
import matplotlib.pyplot as plt
import seaborn as sns
# Web UI
import gradio as gr
# --- 全局设置 ---
warnings.filterwarnings("ignore")
logging.getLogger('prophet').setLevel(logging.ERROR)
logging.getLogger('cmdstanpy').setLevel(logging.ERROR)
# 配置中文字体 (使用 packages.txt 安装的字体)
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei']
plt.rcParams['axes.unicode_minus'] = False
# --- 输出文件夹设置 ---
OUTPUT_DIR = 'outputs'
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
# ======================== (B) 辅助函数 ========================
def calculate_metrics(actual, predicted):
metrics_df = pd.DataFrame({'actual': actual, 'predicted': predicted}).dropna()
if metrics_df.empty:
return {'MAE': np.nan, 'RMSE': np.nan, 'MAPE': np.nan, 'sMAPE': np.nan}
clean_actual, clean_predicted = metrics_df['actual'], metrics_df['predicted']
mae = mean_absolute_error(clean_actual, clean_predicted)
rmse = np.sqrt(mean_squared_error(clean_actual, clean_predicted))
actual_safe = np.where(clean_actual == 0, 1e-6, clean_actual)
mape = np.mean(np.abs((clean_actual - clean_predicted) / actual_safe)) * 100
smape = 200 * np.mean(np.abs(clean_actual - clean_predicted) / (np.abs(clean_actual) + np.abs(clean_predicted)))
return {'MAE': mae, 'RMSE': rmse, 'MAPE': mape, 'sMAPE': smape}
# ======================== (C) 主分析函数 (Gradio核心) ========================
def run_full_analysis(progress=gr.Progress(track_tqdm=True)):
"""
这个主函数封装了所有的分析步骤,并通过 yield 返回结果来实时更新Gradio界面。
"""
# --- 1. 初始化 ---
log_lines = ["## 🚀 数据分析流程已启动..."]
figure_paths = []
final_report_text = ""
report_file_path = None
# 辅助函数,用于将Matplotlib Figure保存为临时图片文件并返回路径
def save_fig_to_path(fig):
# 使用 NamedTemporaryFile 来创建一个不会被立即删除的临时文件
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmpfile:
fig.savefig(tmpfile.name)
figure_paths.append(tmpfile.name)
plt.close(fig) # 操作完成后关闭图形,释放内存
# 辅助函数,用于更新UI状态
def update_ui(new_log_line=None):
if new_log_line:
log_lines.append(new_log_line)
# [log, gallery, final_report, download_button]
return "\n\n".join(log_lines), figure_paths, final_report_text, report_file_path
yield update_ui() # 立即显示启动信息
try:
# --- 2. 数据清洗 ---
log_lines.append("### 1. 数据清洗")
df = pd.read_excel('gmqrkl.xlsx')
df['Date'] = pd.to_datetime(df['Date'])
df = df.drop_duplicates(subset=['Date']).sort_values('Date').reset_index(drop=True)
log_lines.append(f"✅ 数据读取并去重成功,共 {len(df)} 行。")
df['Value'] = df['Value'].replace(0, np.nan)
df['Value'].interpolate(method='linear', limit_direction='both', inplace=True)
log_lines.append("✅ 零值替换与线性插值完成。")
df.set_index('Date', inplace=True)
ts_data = df['Value']
yield update_ui()
# --- 3. 平稳性检验与差分 ---
log_lines.append("### 2. 平稳性检验与差分")
current_data = ts_data.dropna()
adf_result = adfuller(current_data)
if adf_result[1] < 0.05:
log_lines.append(f"✅ 序列在 d=0 阶差分后达到平稳 (p={adf_result[1]:.4f})。")
d_order = 0
else:
current_data_diff = current_data.diff().dropna()
adf_result_diff = adfuller(current_data_diff)
d_order = 1
if adf_result_diff[1] < 0.05:
log_lines.append(f"✅ 序列在 d=1 阶差分后达到平稳 (p={adf_result_diff[1]:.4f})。")
else:
log_lines.append(f"⚠️ 1阶差分后仍未平稳 (p={adf_result_diff[1]:.4f}),将使用 d=1 继续分析。")
current_data = current_data_diff
ts_stationary = current_data
yield update_ui()
# --- 4. 白噪声检验 ---
log_lines.append("### 3. 白噪声检验")
lags = min(10, len(ts_stationary) // 5)
lb_test_result = acorr_ljungbox(ts_stationary, lags=[lags], return_df=True)
if lb_test_result['lb_pvalue'].iloc[0] > 0.05:
log_lines.append(f"⚠️ 序列可能是白噪声(p-value = {lb_test_result['lb_pvalue'].iloc[0]:.4f}),模型可能无效。")
else:
log_lines.append(f"✅ 通过白噪声检验 (p-value = {lb_test_result['lb_pvalue'].iloc[0]:.4f}),可以进行后续建模。")
yield update_ui()
# --- 5. 季节性检验与分解 ---
log_lines.append("\n### 4. 季节性检验与STL分解")
period = 365
seasonal_enabled = len(ts_data) > 2 * 14 # 数据多于两周则开启周季节性
m_period = 7 if seasonal_enabled else 1
log_lines.append(f"✅ 季节性参数设定: m={m_period}, seasonal={seasonal_enabled}")
if len(ts_data) >= 2 * period:
seasonal_period_for_stl = period if period % 2 != 0 else period + 1
log_lines.append(f"✅ 准备进行STL分解,周期(period)={period},季节平滑窗口(seasonal)={seasonal_period_for_stl}。")
yield update_ui()
stl = STL(ts_data, period=period, seasonal=seasonal_period_for_stl)
res = stl.fit()
fig = res.plot()
fig.set_size_inches(12, 8)
fig.suptitle(f'STL 分解图 (周期={period})', fontsize=16, y=0.98)
plt.tight_layout()
save_fig_to_path(fig)
log_lines.append("✅ STL分解图已生成。")
else:
log_lines.append("⚠️ 数据长度不足以进行年度季节性分解。")
yield update_ui()
# --- 6. 混合策略回测优化窗口大小 ---
log_lines.append("\n### 5. 优化训练窗口大小")
log_lines.append("⏳ **此步骤计算量大,可能需要5-15分钟,请耐心等待...**")
yield update_ui()
def evaluate_window_hybrid(window_size, time_series, d, m, seasonal):
errors = []
series_values = time_series.values
backtest_length = 100
if len(series_values) <= window_size + backtest_length: return {'window_size': window_size, 'mae': np.inf}
end_index = len(series_values)
start_index = end_index - backtest_length
for i in range(start_index, end_index):
train_window = pd.Series(series_values[i-window_size:i], index=time_series.index[i-window_size:i])
test_point = series_values[i]
use_seasonal = seasonal and (len(train_window) >= 2 * m)
try:
model = pm.auto_arima(train_window, d=d, seasonal=use_seasonal, m=m, max_p=2, max_q=2,
stepwise=True, trace=False, error_action='ignore', suppress_warnings=True)
forecast = model.predict(n_periods=1)[0]
errors.append(test_point - forecast)
except Exception: continue
if not errors: return {'window_size': window_size, 'mae': np.inf}
return {'window_size': window_size, 'mae': np.mean(np.abs(errors))}
window_sizes_to_test = np.arange(70, 211, 14)
with Parallel(n_jobs=-1) as parallel:
results = parallel(
delayed(evaluate_window_hybrid)(ws, ts_data, d_order, m_period, seasonal_enabled) for ws in window_sizes_to_test
)
window_results_df = pd.DataFrame(results).sort_values('mae').set_index('window_size')
if not window_results_df.empty and np.isfinite(window_results_df['mae'].min()):
best_window_size = window_results_df['mae'].idxmin()
log_lines.append(f"✅ **窗口优化完成!** 基于MAE,最佳训练窗口大小为: **{best_window_size}** 天。")
else:
best_window_size = 90
log_lines.append(f"⚠️ 窗口优化失败,使用默认窗口大小: {best_window_size} 天。")
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(window_results_df.index, window_results_df['mae'], marker='o', label='MAE')
ax.set_title('训练窗口大小对预测误差的影响')
ax.set_xlabel('训练窗口天数'); ax.set_ylabel('误差值'); ax.legend(); ax.grid(True)
save_fig_to_path(fig)
yield update_ui()
# --- 7 & 8. 动态滚动预测与评估 ---
log_lines.append("\n### 6. 动态滚动预测与评估")
log_lines.append("⏳ **此步骤同样耗时,正在进行模型滚动预测...**")
yield update_ui()
split_point_roll = int(len(ts_data) * 0.8)
test_rolling_target = ts_data.iloc[split_point_roll:]
rolling_predictions = {}
# SARIMA 滚动
sarima_rolling_preds = []
for i in progress.tqdm(range(len(test_rolling_target)), desc="SARIMA Rolling Forecast"):
train_window = ts_data.iloc[split_point_roll + i - best_window_size : split_point_roll + i]
try:
model = pm.auto_arima(train_window, d=d_order, m=m_period, seasonal=seasonal_enabled,
stepwise=True, trace=False, error_action='ignore', suppress_warnings=True)
sarima_rolling_preds.append(model.predict(n_periods=1)[0])
except:
sarima_rolling_preds.append(sarima_rolling_preds[-1] if sarima_rolling_preds else np.nan)
rolling_predictions['Auto-SARIMA'] = pd.Series(sarima_rolling_preds, index=test_rolling_target.index).ffill()
log_lines.append("✅ Auto-SARIMA 滚动预测完成。")
yield update_ui()
# Prophet 滚动
prophet_rolling_preds = []
prophet_model = None
for i, (date, value) in enumerate(progress.tqdm(test_rolling_target.items(), desc="Prophet Rolling Forecast")):
if i % 14 == 0 or prophet_model is None:
train_upto_date = ts_data.loc[:date - pd.Timedelta(days=1)]
prophet_train_df = train_upto_date.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'})
prophet_model = Prophet(yearly_seasonality='auto', weekly_seasonality=seasonal_enabled, daily_seasonality=False).fit(prophet_train_df)
future_df = pd.DataFrame({'ds': [date]})
forecast = prophet_model.predict(future_df)
prophet_rolling_preds.append(forecast['yhat'].iloc[0])
rolling_predictions['Prophet'] = pd.Series(prophet_rolling_preds, index=test_rolling_target.index)
log_lines.append("✅ Prophet 滚动预测完成。")
# 评估
rolling_evaluation_results = {name: calculate_metrics(test_rolling_target, preds) for name, preds in rolling_predictions.items()}
rolling_evaluation_df = pd.DataFrame(rolling_evaluation_results).T.sort_values(by='MAE')
best_rolling_model_name = rolling_evaluation_df.index[0]
log_lines.append("\n**滚动预测性能对比:**")
log_lines.append(f"```\n{rolling_evaluation_df.to_markdown()}\n```")
log_lines.append(f"\n==> ✅ 最佳滚动预测模型是: **{best_rolling_model_name}**")
yield update_ui()
# --- 9. 滚动预测可视化 ---
log_lines.append("\n### 7. 生成结果图表")
fig, ax = plt.subplots(figsize=(15, 8))
ax.plot(ts_data, label='历史数据', color='gray', alpha=0.5)
ax.plot(test_rolling_target, label='真实值 (测试集)', color='blue', linewidth=2)
for model_name, preds in rolling_predictions.items():
is_best = ' (最佳)' if model_name == best_rolling_model_name else ''
ax.plot(preds.dropna(), label=f'{model_name} 预测{is_best}', linestyle='--')
ax.set_title('滚动预测结果对比'); ax.legend(); ax.grid(True)
save_fig_to_path(fig)
yield update_ui()
# --- 10. 最终未来预测 ---
forecast_horizon = 90
log_lines.append(f"\n### 8. 使用最佳模型 `{best_rolling_model_name}` 预测未来 {forecast_horizon} 天")
# 训练最终模型
if 'Auto-SARIMA' in best_rolling_model_name:
final_train_data = ts_data.iloc[-best_window_size:]
final_model = pm.auto_arima(final_train_data, d=d_order, m=m_period, seasonal=seasonal_enabled,
stepwise=True, trace=False, error_action='ignore', suppress_warnings=True)
final_forecast_values, conf_int = final_model.predict(n_periods=forecast_horizon, return_conf_int=True)
else: # Prophet
final_train_data = ts_data
final_prophet_train_df = final_train_data.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'})
final_model = Prophet(yearly_seasonality='auto', weekly_seasonality=seasonal_enabled, daily_seasonality=False).fit(final_prophet_train_df)
future_df = final_model.make_future_dataframe(periods=forecast_horizon, freq='D')
forecast_obj = final_model.predict(future_df)
final_forecast_values = forecast_obj['yhat'].iloc[-forecast_horizon:].values
conf_int = np.column_stack((forecast_obj['yhat_lower'].iloc[-forecast_horizon:].values, forecast_obj['yhat_upper'].iloc[-forecast_horizon:].values))
future_dates = pd.date_range(start=ts_data.index[-1] + pd.Timedelta(days=1), periods=forecast_horizon)
final_forecast_series = pd.Series(final_forecast_values, index=future_dates)
# 可视化最终预测
fig, ax = plt.subplots(figsize=(15, 8))
ax.plot(ts_data.tail(365), label='近期历史数据', color='blue')
ax.plot(final_forecast_series, label=f'未来 {forecast_horizon} 天预测', color='red', linestyle='--')
ax.fill_between(future_dates, conf_int[:, 0], conf_int[:, 1], color='red', alpha=0.2, label='95% 置信区间')
ax.set_title(f'最终未来用量预测 (基于 {best_rolling_model_name})'); ax.legend(); ax.grid(True)
save_fig_to_path(fig)
# 生成最终报告
final_report_text = f"""
# 药品用量预测分析报告
## 1. 数据概览
- **数据时间范围**: {ts_data.index.min().strftime('%Y-%m-%d')} to {ts_data.index.max().strftime('%Y-%m-%d')}
- **总数据点**: {len(ts_data)}
- **平均用量**: {ts_data.mean():.2f}
## 2. 分析与建模参数
- **平稳性差分阶数 (d)**: {d_order}
- **季节性周期 (m)**: {m_period}
- **最佳训练窗口**: {best_window_size} 天
## 3. 模型评估 (基于动态滚动预测)
通过在历史数据上进行滚动预测,我们能更真实地评估模型在实际应用中的表现。
{rolling_evaluation_df.to_markdown()}
## 4. 最终结论与未来预测
- **最佳模型**: **{best_rolling_model_name}** 被选为最终预测模型,因为它在滚动预测中表现最佳(MAE最低)。
- **未来预测**: 已使用 `{best_rolling_model_name}` 模型对未来 **{forecast_horizon}** 天的用量进行预测。
- **预测摘要**:
- 未来一周平均日用量: **{final_forecast_series.head(7).mean():.2f}**
- 未来一月平均日用量: **{final_forecast_series.head(30).mean():.2f}**
""".strip()
report_file_path = os.path.join(OUTPUT_DIR, 'final_analysis_report.txt')
with open(report_file_path, 'w', encoding='utf-8') as f:
f.write(final_report_text)
log_lines.append("\n## 🎉 全部分析流程完成!请查看最终报告和图表。")
yield update_ui()
except Exception as e:
log_lines.append(f"\n\n❌ **分析过程中断,出现错误:**\n`{str(e)}`")
log_lines.append(f"\n**Traceback:**\n```{traceback.format_exc()}```")
yield update_ui()
# ======================== (D) Gradio 界面构建 ========================
with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as demo:
gr.Markdown(
"""
# 📈 智能时序预测与分析平台 (动态回测版)
欢迎使用!请确保您的数据文件 `gmqrkl.xlsx` 已上传至本 Space 的文件库中。
然后,点击下方按钮,启动包含 **窗口优化** 和 **动态滚动预测** 的完整分析流程。
**注意:完整流程计算量大,可能需要10-20分钟。请耐心等待,并观察下方日志区的实时进度。**
"""
)
start_button = gr.Button("🚀 点击这里,开始完整分析", variant="primary")
gr.Markdown("---")
with gr.Tabs():
with gr.TabItem("📊 可视化图表", id=0):
gallery_output = gr.Gallery(label="分析图表", elem_id="gallery", columns=[1], height="auto", object_fit="contain")
with gr.TabItem("📝 实时分析日志", id=1):
log_output = gr.Markdown("点击按钮后,分析日志将实时显示在这里...")
with gr.TabItem("📋 最终报告与下载", id=2):
final_report_output = gr.Markdown("分析完成后,最终报告将显示在这里。")
download_output = gr.File(label="下载报告文件")
start_button.click(
fn=run_full_analysis,
inputs=None,
outputs=[log_output, gallery_output, final_report_output, download_output]
)
gr.Markdown("<p style='text-align: center; font-size: 12px; color: grey;'>Powered by Gradio and Hugging Face Spaces.</p>")
if __name__ == "__main__":
demo.launch() |