leonsimon23's picture
Update app.py
3a3505d verified
import gradio as gr
import pandas as pd
import numpy as np
import warnings
from datetime import timedelta
import matplotlib
import matplotlib.pyplot as plt
from scipy.stats import zscore
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.seasonal import STL
from pmdarima.arima import nsdiffs
from pmdarima import auto_arima
from prophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error
import seaborn as sns
from joblib import Parallel, delayed
# 忽略所有警告,但这可能不会抑制所有来自底层库的FutureWarning
warnings.filterwarnings('ignore')
# 更精确地忽略来自sklearn.utils.deprecation的FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning, module='sklearn.utils.deprecation')
# --- 在Hugging Face上配置中文字体 ---
# 确保你已经上传了 SimHei.ttf 字体文件
# 或者在环境中安装了 WenQuanYi Zen Hei 字体
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# =============================================================================
# 将所有分析步骤封装在一个主函数中
# =============================================================================
def run_analysis(file_obj):
"""
运行完整的时序预测分析流程。
包括数据预处理、平稳性/白噪声/季节性检验、窗口优化、
SARIMA和Prophet模型预测、加权平均模型以及结果可视化。
参数:
file_obj (gr.File): Gradio文件对象,包含上传的Excel文件。
返回:
tuple: 包含分析摘要文本、四周预测指标DataFrame、四周预测图、
一周预测指标DataFrame、一周预测图和STL分解图。
"""
if file_obj is None:
raise gr.Error("请先上传一个Excel文件!")
print("----- 开始分析 -----")
# ----- 1. 数据预处理 -----
print("----- 1. 数据预处理 -----")
# 读取Excel文件
df = pd.read_excel(file_obj.name)
# 将'Date'列转换为日期时间格式
df['Date'] = pd.to_datetime(df['Date'])
# 按日期排序
df.sort_values('Date', inplace=True)
# 将'Date'列设置为索引
df.set_index('Date', inplace=True)
# 将'Value'列中的0替换为NaN
df['Value'] = df['Value'].replace(0, np.nan)
# 使用线性插值填充NaN值
df['Value'].interpolate(method='linear', inplace=True)
print(f"数据加载成功,共 {len(df)} 条记录。")
# ----- 2. 平稳性与白噪声检验 (仅输出结果) -----
print("----- 2. 平稳性与白噪声检验 -----")
# 对时间序列进行差分以使其平稳
ts_stationary, d_order = make_stationary_silent(df['Value'])
# 进行Ljung-Box白噪声检验
lb_p_value = acorr_ljungbox(ts_stationary, lags=[10], return_df=True)['lb_pvalue'].iloc[0]
print(f"最优差分阶数 d = {d_order}, 白噪声检验 P-value = {lb_p_value:.4f}")
# ----- 3. 季节性检验 -----
print("----- 3. 季节性检验 -----")
# 使用Canova-Hansen检验确定季节差分阶数
D_order = nsdiffs(df['Value'], m=7, test='ch') # 假设周期为7天
print(f"季节差分阶数 D = {D_order}")
# STL分解图
# 对时间序列进行STL分解,提取趋势、季节性和残差
stl = STL(df['Value'], period=7, robust=True)
res = stl.fit()
fig_stl = res.plot()
fig_stl.suptitle("STL季节性分解 (周期=7)", y=1.02)
print("STL季节性分解图已生成。")
# ----- 4. 窗口优化 (简化版,避免超时) -----
print("----- 4. 窗口优化 (简化版) -----")
# 在线上环境中,完整的窗口优化太慢,这里使用一个固定的值或一个快速的估计
# 为了演示,我们仍然运行一个简化版的窗口评估
ts_values = df['Value'].values
window_sizes = range(100, 181, 20) # 减少评估点,评估100到180天窗口,步长20
print(f"开始评估 {len(window_sizes)} 个滑动窗口...")
# 并行评估不同窗口大小下的模型性能
results = Parallel(n_jobs=-1)(delayed(evaluate_window)(ws, ts_values) for ws in window_sizes)
results_df = pd.DataFrame(results)
# 确定最优窗口大小,如果结果为空则默认为120
OPTIMAL_WINDOW = int(results_df.loc[results_df['mae'].idxmin()]['window_size']) if not results_df.empty else 120
print(f"所有滑动窗口评估完成。计算得到的最优滑动窗口为: {OPTIMAL_WINDOW} 天")
# ----- 5. 模型预测 -----
print("----- 5. 模型预测 -----")
# 定义训练集和测试集的日期范围
final_train_end_date = df.index.max() - timedelta(days=28) # 留出最后28天作为测试集
final_test_start_date = final_train_end_date + timedelta(days=1)
# SARIMA模型训练数据:基于最优窗口
train_sarima = df[final_train_end_date - timedelta(days=OPTIMAL_WINDOW - 1) : final_train_end_date]
# Prophet模型训练数据:使用所有可用数据直到训练结束日期
train_full = df[:final_train_end_date]
# 测试数据
test_data = df[final_test_start_date:]
print(f"训练数据截止日期: {final_train_end_date.strftime('%Y-%m-%d')}, 测试数据开始日期: {final_test_start_date.strftime('%Y-%m-%d')}")
print("正在生成 SARIMA 模型预测...")
sarima_pred = sarima_model(train_sarima, d_order, D_order, h=28)
print("SARIMA 模型预测完成。")
print("正在生成 Prophet 模型预测...")
prophet_pred = prophet_model(train_full, h=28)
print("Prophet 模型预测完成。")
print("正在生成加权平均模型预测...")
weighted_pred, sw, pw = weighted_average_model(train_full, d_order, D_order, h=28)
print(f"加权平均模型预测完成。模型权重: SARIMA={sw:.2f}, Prophet={pw:.2f}")
# 将实际值和各模型预测值整合到DataFrame中
predictions = pd.DataFrame({
'Actual': test_data['Value'],
'SARIMA': sarima_pred,
'Prophet': prophet_pred,
'Weighted': weighted_pred
}).dropna() # 丢弃包含NaN的行
# ----- 6. 指标计算与可视化 -----
print("----- 6. 指标计算与可视化 -----")
# 计算所有模型的性能指标
metrics = {model: calculate_metrics(predictions['Actual'], predictions[model]) for model in ['SARIMA', 'Prophet', 'Weighted']}
metrics_df = pd.DataFrame(metrics).T.reset_index().rename(columns={'index': 'Model'})
print("性能指标计算完成。")
# 4周预测对比图
fig_forecast_4w = plt.figure(figsize=(12, 6))
plt.plot(predictions.index, predictions['Actual'], label='实际值', color='black', marker='o')
plt.plot(predictions.index, predictions['SARIMA'], label='SARIMA预测', color='red', linestyle='--')
plt.plot(predictions.index, predictions['Prophet'], label='Prophet预测', color='blue', linestyle='-.')
plt.plot(predictions.index, predictions['Weighted'], label=f'加权平均 (S:{sw:.2f}, P:{pw:.2f})', color='green', linestyle=':')
plt.title('未来4周预测对比', fontsize=16)
plt.legend(); plt.grid(True)
print("未来4周预测对比图已生成。")
# 1周预测对比图和指标
first_week_preds = predictions.head(7) # 获取第一周的预测数据
first_week_metrics = pd.DataFrame({
model: calculate_metrics(first_week_preds['Actual'], first_week_preds[model])
for model in ['SARIMA', 'Prophet', 'Weighted']
}).T.reset_index().rename(columns={'index': 'Model'})
fig_forecast_1w = plt.figure(figsize=(10, 5))
plt.plot(first_week_preds.index, first_week_preds['Actual'], label='实际值', color='black', marker='o')
plt.plot(first_week_preds.index, first_week_preds['SARIMA'], label='SARIMA预测', color='red', linestyle='--')
plt.plot(first_week_preds.index, first_week_preds['Prophet'], label='Prophet预测', color='blue', linestyle='-.')
plt.plot(first_week_preds.index, first_week_preds['Weighted'], label='加权平均预测', color='green', linestyle=':')
plt.title('第一周预测结果对比'); plt.legend(); plt.grid(True)
print("第一周预测结果对比图已生成。")
# 生成分析摘要文本
summary_text = (
f"数据加载成功,共 {len(df)} 条记录。\n"
f"最优差分阶数 d = {d_order}, 季节差分阶数 D = {D_order}\n"
f"白噪声检验 P-value = {lb_p_value:.4f} {'(序列为白噪声)' if lb_p_value > 0.05 else '(序列为非白噪声)'}\n"
f"计算得到的最优滑动窗口为: {OPTIMAL_WINDOW} 天\n"
f"模型权重: SARIMA={sw:.2f}, Prophet={pw:.2f}"
)
print("----- 分析完成 -----")
# 返回所有结果
return summary_text, metrics_df, fig_forecast_4w, first_week_metrics, fig_forecast_1w, fig_stl
# =============================================================================
# 辅助函数 (从主脚本中提取)
# =============================================================================
def make_stationary_silent(data_series, max_diff=2):
"""
通过差分使时间序列平稳。
参数:
data_series (pd.Series): 输入时间序列。
max_diff (int): 最大差分阶数。
返回:
tuple: 平稳后的时间序列和差分阶数。
"""
diff_order = 0
current_series = data_series.dropna()
for d in range(max_diff + 1):
if d > 0:
current_series = current_series.diff().dropna()
# 使用ADF检验检查平稳性
if adfuller(current_series)[1] < 0.05: # P-value小于0.05则认为平稳
return current_series, d
return current_series, max_diff
def evaluate_window(window_size, ts_values):
"""
评估给定窗口大小下模型的性能。
参数:
window_size (int): 滑动窗口大小。
ts_values (np.array): 时间序列值数组。
返回:
dict: 包含窗口大小、MAE和RMSE的字典。
"""
n = len(ts_values)
if window_size >= n:
return {'mae': float('inf'), 'rmse': float('inf')}
print(f" - 正在评估窗口大小: {window_size} 天...") # 打印当前评估的窗口大小
# 在滑动窗口上训练auto_arima模型并进行单步预测
errors = []
for i in range(n - window_size - 1):
try:
model = auto_arima(ts_values[i:(i + window_size)], d=0, stepwise=True, suppress_warnings=True, error_action='ignore')
prediction = model.predict(n_periods=1)[0]
errors.append(ts_values[i + window_size] - prediction)
except Exception as e:
# 捕获auto_arima可能出现的错误,避免中断循环
# print(f"窗口评估中auto_arima出现错误: {e}") # 避免过多打印,只在必要时开启
errors.append(np.nan) # 标记为NaN,后续会处理
errors = np.array(errors)
errors = errors[~np.isnan(errors)] # 移除NaN错误
if len(errors) == 0:
return {'window_size': window_size, 'mae': float('inf'), 'rmse': float('inf')}
return {'window_size': window_size, 'mae': np.mean(np.abs(errors)), 'rmse': np.sqrt(np.mean(np.square(errors)))}
def sarima_model(train_data, d, D, h):
"""
训练SARIMA模型并进行预测。
参数:
train_data (pd.DataFrame): 训练数据。
d (int): 非季节差分阶数。
D (int): 季节差分阶数。
h (int): 预测步长。
返回:
pd.Series: SARIMA模型预测结果。
"""
# 使用auto_arima自动选择最优SARIMA模型参数
model = auto_arima(train_data['Value'], d=d, D=D, m=7, seasonal=True, stepwise=True, suppress_warnings=True, error_action='ignore')
return model.predict(n_periods=h)
def prophet_model(train_data, h):
"""
训练Prophet模型并进行预测。
参数:
train_data (pd.DataFrame): 训练数据。
h (int): 预测步长。
返回:
pd.Series: Prophet模型预测结果。
"""
# 准备Prophet模型所需的数据格式
df_prophet = train_data.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'})
# 初始化并训练Prophet模型
model = Prophet(yearly_seasonality=True, weekly_seasonality=True)
model.fit(df_prophet)
# 创建未来预测的DataFrame
future = model.make_future_dataframe(periods=h, freq='D')
# 进行预测并返回预测值
return model.predict(future)['yhat'].tail(h)
def weighted_average_model(train_data, d, D, h):
"""
训练加权平均模型(SARIMA和Prophet)并进行预测。
权重基于验证集上的MAE反比。
参数:
train_data (pd.DataFrame): 训练数据。
d (int): SARIMA非季节差分阶数。
D (int): SARIMA季节差分阶数。
h (int): 预测步长。
返回:
tuple: 加权平均预测结果(pd.Series)、SARIMA权重、Prophet权重。
"""
validation_data = train_data.tail(28) # 留出最后28天作为验证集
train_for_val = train_data.iloc[:-28] # 用于训练验证模型的子集
if len(train_for_val) < 56: # 确保训练集足够大,至少是验证集的两倍
# 如果数据太少,直接使用Prophet模型,并设置Prophet权重为1.0
prophet_pred_full = prophet_model(train_data, h)
return prophet_pred_full, 0.0, 1.0
# 在验证集上生成SARIMA和Prophet预测
sarima_fc_val = sarima_model(train_for_val, d, D, h=28)
prophet_fc_val = prophet_model(train_for_val, h=28)
# 计算各自在验证集上的MAE
sarima_mae = mean_absolute_error(validation_data['Value'], sarima_fc_val)
prophet_mae = mean_absolute_error(validation_data['Value'], prophet_fc_val)
# 根据MAE计算权重(MAE越小,权重越大)
if sarima_mae + prophet_mae == 0: # 避免除以零
sarima_weight, prophet_weight = 0.5, 0.5
else:
inv_sum = (1/sarima_mae) + (1/prophet_mae)
sarima_weight = (1/sarima_mae) / inv_sum
prophet_weight = (1/prophet_mae) / inv_sum
# 使用完整训练数据生成最终预测
sarima_pred = sarima_model(train_data, d, D, h=h)
prophet_pred = prophet_model(train_data, h=h)
# 计算加权平均预测
weighted_avg = sarima_weight * sarima_pred.values + prophet_weight * prophet_pred.values
return pd.Series(weighted_avg, index=sarima_pred.index), sarima_weight, prophet_weight
def calculate_metrics(actual, predicted):
"""
计算预测性能指标:MAE, RMSE, MAPE。
参数:
actual (pd.Series): 实际值。
predicted (pd.Series): 预测值。
返回:
dict: 包含MAE, RMSE, MAPE的字典。
"""
# 确保实际值和预测值长度一致
min_len = min(len(actual), len(predicted))
actual = actual.iloc[:min_len]
predicted = predicted.iloc[:min_len]
# 避免除以零或无穷大
mape = np.mean(np.abs((actual - predicted) / actual.replace(0, np.nan))) * 100
mape = np.nan_to_num(mape, nan=0.0, posinf=0.0, neginf=0.0) # 处理可能出现的NaN/Inf
return {
'MAE': mean_absolute_error(actual, predicted),
'RMSE': np.sqrt(mean_squared_error(actual, predicted)),
'MAPE': mape
}
# =============================================================================
# 创建Gradio界面
# =============================================================================
with gr.Blocks(theme=gr.themes.Soft(), title="药品销量时序预测") as app:
gr.Markdown("# 药品销量时序序列预测分析")
gr.Markdown("上传包含 'Date' 和 'Value' 列的Excel文件,系统将自动进行数据处理、模型训练、预测和评估。")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="上传Excel文件", file_types=['.xlsx'])
run_button = gr.Button("开始分析", variant="primary")
summary_output = gr.Textbox(label="分析摘要", lines=7, interactive=False)
with gr.Column(scale=2):
gr.Markdown("### 四周预测性能指标")
metrics_4w_output = gr.DataFrame(headers=["Model", "MAE", "RMSE", "MAPE"], interactive=False)
gr.Markdown("### 一周预测性能指标")
metrics_1w_output = gr.DataFrame(headers=["Model", "MAE", "RMSE", "MAPE"], interactive=False)
with gr.Row():
plot_4w_output = gr.Plot(label="四周预测对比图")
plot_1w_output = gr.Plot(label="一周预测对比图")
with gr.Row():
plot_stl_output = gr.Plot(label="STL季节性分解图")
gr.Markdown("") # 占位符,保持布局
# 设置按钮点击事件
run_button.click(
fn=run_analysis,
inputs=[file_input],
outputs=[summary_output, metrics_4w_output, plot_4w_output, metrics_1w_output, plot_1w_output, plot_stl_output]
)
# 示例文件,方便用户测试
gr.Examples(
[["gmqrkl.xlsx"]], # 请确保 'gmqrkl.xlsx' 文件在Gradio应用运行的环境中可访问
inputs=[file_input],
fn=run_analysis,
outputs=[summary_output, metrics_4w_output, plot_4w_output, metrics_1w_output, plot_1w_output, plot_stl_output]
)
# 明确指定 Gradio 监听的地址和端口,以确保在 Hugging Face Spaces 上正确启动
app.launch(server_name="0.0.0.0", server_port=7860)