Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import warnings | |
from datetime import timedelta | |
import matplotlib | |
import matplotlib.pyplot as plt | |
from scipy.stats import zscore | |
from statsmodels.tsa.stattools import adfuller | |
from statsmodels.stats.diagnostic import acorr_ljungbox | |
from statsmodels.tsa.seasonal import STL | |
from pmdarima.arima import nsdiffs | |
from pmdarima import auto_arima | |
from prophet import Prophet | |
from sklearn.metrics import mean_absolute_error, mean_squared_error | |
import seaborn as sns | |
from joblib import Parallel, delayed | |
# 忽略所有警告,但这可能不会抑制所有来自底层库的FutureWarning | |
warnings.filterwarnings('ignore') | |
# 更精确地忽略来自sklearn.utils.deprecation的FutureWarning | |
warnings.filterwarnings("ignore", category=FutureWarning, module='sklearn.utils.deprecation') | |
# --- 在Hugging Face上配置中文字体 --- | |
# 确保你已经上传了 SimHei.ttf 字体文件 | |
# 或者在环境中安装了 WenQuanYi Zen Hei 字体 | |
plt.rcParams['font.sans-serif'] = ['WenQuanYi Zen Hei'] | |
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 | |
# ============================================================================= | |
# 将所有分析步骤封装在一个主函数中 | |
# ============================================================================= | |
def run_analysis(file_obj): | |
""" | |
运行完整的时序预测分析流程。 | |
包括数据预处理、平稳性/白噪声/季节性检验、窗口优化、 | |
SARIMA和Prophet模型预测、加权平均模型以及结果可视化。 | |
参数: | |
file_obj (gr.File): Gradio文件对象,包含上传的Excel文件。 | |
返回: | |
tuple: 包含分析摘要文本、四周预测指标DataFrame、四周预测图、 | |
一周预测指标DataFrame、一周预测图和STL分解图。 | |
""" | |
if file_obj is None: | |
raise gr.Error("请先上传一个Excel文件!") | |
print("----- 开始分析 -----") | |
# ----- 1. 数据预处理 ----- | |
print("----- 1. 数据预处理 -----") | |
# 读取Excel文件 | |
df = pd.read_excel(file_obj.name) | |
# 将'Date'列转换为日期时间格式 | |
df['Date'] = pd.to_datetime(df['Date']) | |
# 按日期排序 | |
df.sort_values('Date', inplace=True) | |
# 将'Date'列设置为索引 | |
df.set_index('Date', inplace=True) | |
# 将'Value'列中的0替换为NaN | |
df['Value'] = df['Value'].replace(0, np.nan) | |
# 使用线性插值填充NaN值 | |
df['Value'].interpolate(method='linear', inplace=True) | |
print(f"数据加载成功,共 {len(df)} 条记录。") | |
# ----- 2. 平稳性与白噪声检验 (仅输出结果) ----- | |
print("----- 2. 平稳性与白噪声检验 -----") | |
# 对时间序列进行差分以使其平稳 | |
ts_stationary, d_order = make_stationary_silent(df['Value']) | |
# 进行Ljung-Box白噪声检验 | |
lb_p_value = acorr_ljungbox(ts_stationary, lags=[10], return_df=True)['lb_pvalue'].iloc[0] | |
print(f"最优差分阶数 d = {d_order}, 白噪声检验 P-value = {lb_p_value:.4f}") | |
# ----- 3. 季节性检验 ----- | |
print("----- 3. 季节性检验 -----") | |
# 使用Canova-Hansen检验确定季节差分阶数 | |
D_order = nsdiffs(df['Value'], m=7, test='ch') # 假设周期为7天 | |
print(f"季节差分阶数 D = {D_order}") | |
# STL分解图 | |
# 对时间序列进行STL分解,提取趋势、季节性和残差 | |
stl = STL(df['Value'], period=7, robust=True) | |
res = stl.fit() | |
fig_stl = res.plot() | |
fig_stl.suptitle("STL季节性分解 (周期=7)", y=1.02) | |
print("STL季节性分解图已生成。") | |
# ----- 4. 窗口优化 (简化版,避免超时) ----- | |
print("----- 4. 窗口优化 (简化版) -----") | |
# 在线上环境中,完整的窗口优化太慢,这里使用一个固定的值或一个快速的估计 | |
# 为了演示,我们仍然运行一个简化版的窗口评估 | |
ts_values = df['Value'].values | |
window_sizes = range(100, 181, 20) # 减少评估点,评估100到180天窗口,步长20 | |
print(f"开始评估 {len(window_sizes)} 个滑动窗口...") | |
# 并行评估不同窗口大小下的模型性能 | |
results = Parallel(n_jobs=-1)(delayed(evaluate_window)(ws, ts_values) for ws in window_sizes) | |
results_df = pd.DataFrame(results) | |
# 确定最优窗口大小,如果结果为空则默认为120 | |
OPTIMAL_WINDOW = int(results_df.loc[results_df['mae'].idxmin()]['window_size']) if not results_df.empty else 120 | |
print(f"所有滑动窗口评估完成。计算得到的最优滑动窗口为: {OPTIMAL_WINDOW} 天") | |
# ----- 5. 模型预测 ----- | |
print("----- 5. 模型预测 -----") | |
# 定义训练集和测试集的日期范围 | |
final_train_end_date = df.index.max() - timedelta(days=28) # 留出最后28天作为测试集 | |
final_test_start_date = final_train_end_date + timedelta(days=1) | |
# SARIMA模型训练数据:基于最优窗口 | |
train_sarima = df[final_train_end_date - timedelta(days=OPTIMAL_WINDOW - 1) : final_train_end_date] | |
# Prophet模型训练数据:使用所有可用数据直到训练结束日期 | |
train_full = df[:final_train_end_date] | |
# 测试数据 | |
test_data = df[final_test_start_date:] | |
print(f"训练数据截止日期: {final_train_end_date.strftime('%Y-%m-%d')}, 测试数据开始日期: {final_test_start_date.strftime('%Y-%m-%d')}") | |
print("正在生成 SARIMA 模型预测...") | |
sarima_pred = sarima_model(train_sarima, d_order, D_order, h=28) | |
print("SARIMA 模型预测完成。") | |
print("正在生成 Prophet 模型预测...") | |
prophet_pred = prophet_model(train_full, h=28) | |
print("Prophet 模型预测完成。") | |
print("正在生成加权平均模型预测...") | |
weighted_pred, sw, pw = weighted_average_model(train_full, d_order, D_order, h=28) | |
print(f"加权平均模型预测完成。模型权重: SARIMA={sw:.2f}, Prophet={pw:.2f}") | |
# 将实际值和各模型预测值整合到DataFrame中 | |
predictions = pd.DataFrame({ | |
'Actual': test_data['Value'], | |
'SARIMA': sarima_pred, | |
'Prophet': prophet_pred, | |
'Weighted': weighted_pred | |
}).dropna() # 丢弃包含NaN的行 | |
# ----- 6. 指标计算与可视化 ----- | |
print("----- 6. 指标计算与可视化 -----") | |
# 计算所有模型的性能指标 | |
metrics = {model: calculate_metrics(predictions['Actual'], predictions[model]) for model in ['SARIMA', 'Prophet', 'Weighted']} | |
metrics_df = pd.DataFrame(metrics).T.reset_index().rename(columns={'index': 'Model'}) | |
print("性能指标计算完成。") | |
# 4周预测对比图 | |
fig_forecast_4w = plt.figure(figsize=(12, 6)) | |
plt.plot(predictions.index, predictions['Actual'], label='实际值', color='black', marker='o') | |
plt.plot(predictions.index, predictions['SARIMA'], label='SARIMA预测', color='red', linestyle='--') | |
plt.plot(predictions.index, predictions['Prophet'], label='Prophet预测', color='blue', linestyle='-.') | |
plt.plot(predictions.index, predictions['Weighted'], label=f'加权平均 (S:{sw:.2f}, P:{pw:.2f})', color='green', linestyle=':') | |
plt.title('未来4周预测对比', fontsize=16) | |
plt.legend(); plt.grid(True) | |
print("未来4周预测对比图已生成。") | |
# 1周预测对比图和指标 | |
first_week_preds = predictions.head(7) # 获取第一周的预测数据 | |
first_week_metrics = pd.DataFrame({ | |
model: calculate_metrics(first_week_preds['Actual'], first_week_preds[model]) | |
for model in ['SARIMA', 'Prophet', 'Weighted'] | |
}).T.reset_index().rename(columns={'index': 'Model'}) | |
fig_forecast_1w = plt.figure(figsize=(10, 5)) | |
plt.plot(first_week_preds.index, first_week_preds['Actual'], label='实际值', color='black', marker='o') | |
plt.plot(first_week_preds.index, first_week_preds['SARIMA'], label='SARIMA预测', color='red', linestyle='--') | |
plt.plot(first_week_preds.index, first_week_preds['Prophet'], label='Prophet预测', color='blue', linestyle='-.') | |
plt.plot(first_week_preds.index, first_week_preds['Weighted'], label='加权平均预测', color='green', linestyle=':') | |
plt.title('第一周预测结果对比'); plt.legend(); plt.grid(True) | |
print("第一周预测结果对比图已生成。") | |
# 生成分析摘要文本 | |
summary_text = ( | |
f"数据加载成功,共 {len(df)} 条记录。\n" | |
f"最优差分阶数 d = {d_order}, 季节差分阶数 D = {D_order}\n" | |
f"白噪声检验 P-value = {lb_p_value:.4f} {'(序列为白噪声)' if lb_p_value > 0.05 else '(序列为非白噪声)'}\n" | |
f"计算得到的最优滑动窗口为: {OPTIMAL_WINDOW} 天\n" | |
f"模型权重: SARIMA={sw:.2f}, Prophet={pw:.2f}" | |
) | |
print("----- 分析完成 -----") | |
# 返回所有结果 | |
return summary_text, metrics_df, fig_forecast_4w, first_week_metrics, fig_forecast_1w, fig_stl | |
# ============================================================================= | |
# 辅助函数 (从主脚本中提取) | |
# ============================================================================= | |
def make_stationary_silent(data_series, max_diff=2): | |
""" | |
通过差分使时间序列平稳。 | |
参数: | |
data_series (pd.Series): 输入时间序列。 | |
max_diff (int): 最大差分阶数。 | |
返回: | |
tuple: 平稳后的时间序列和差分阶数。 | |
""" | |
diff_order = 0 | |
current_series = data_series.dropna() | |
for d in range(max_diff + 1): | |
if d > 0: | |
current_series = current_series.diff().dropna() | |
# 使用ADF检验检查平稳性 | |
if adfuller(current_series)[1] < 0.05: # P-value小于0.05则认为平稳 | |
return current_series, d | |
return current_series, max_diff | |
def evaluate_window(window_size, ts_values): | |
""" | |
评估给定窗口大小下模型的性能。 | |
参数: | |
window_size (int): 滑动窗口大小。 | |
ts_values (np.array): 时间序列值数组。 | |
返回: | |
dict: 包含窗口大小、MAE和RMSE的字典。 | |
""" | |
n = len(ts_values) | |
if window_size >= n: | |
return {'mae': float('inf'), 'rmse': float('inf')} | |
print(f" - 正在评估窗口大小: {window_size} 天...") # 打印当前评估的窗口大小 | |
# 在滑动窗口上训练auto_arima模型并进行单步预测 | |
errors = [] | |
for i in range(n - window_size - 1): | |
try: | |
model = auto_arima(ts_values[i:(i + window_size)], d=0, stepwise=True, suppress_warnings=True, error_action='ignore') | |
prediction = model.predict(n_periods=1)[0] | |
errors.append(ts_values[i + window_size] - prediction) | |
except Exception as e: | |
# 捕获auto_arima可能出现的错误,避免中断循环 | |
# print(f"窗口评估中auto_arima出现错误: {e}") # 避免过多打印,只在必要时开启 | |
errors.append(np.nan) # 标记为NaN,后续会处理 | |
errors = np.array(errors) | |
errors = errors[~np.isnan(errors)] # 移除NaN错误 | |
if len(errors) == 0: | |
return {'window_size': window_size, 'mae': float('inf'), 'rmse': float('inf')} | |
return {'window_size': window_size, 'mae': np.mean(np.abs(errors)), 'rmse': np.sqrt(np.mean(np.square(errors)))} | |
def sarima_model(train_data, d, D, h): | |
""" | |
训练SARIMA模型并进行预测。 | |
参数: | |
train_data (pd.DataFrame): 训练数据。 | |
d (int): 非季节差分阶数。 | |
D (int): 季节差分阶数。 | |
h (int): 预测步长。 | |
返回: | |
pd.Series: SARIMA模型预测结果。 | |
""" | |
# 使用auto_arima自动选择最优SARIMA模型参数 | |
model = auto_arima(train_data['Value'], d=d, D=D, m=7, seasonal=True, stepwise=True, suppress_warnings=True, error_action='ignore') | |
return model.predict(n_periods=h) | |
def prophet_model(train_data, h): | |
""" | |
训练Prophet模型并进行预测。 | |
参数: | |
train_data (pd.DataFrame): 训练数据。 | |
h (int): 预测步长。 | |
返回: | |
pd.Series: Prophet模型预测结果。 | |
""" | |
# 准备Prophet模型所需的数据格式 | |
df_prophet = train_data.reset_index().rename(columns={'Date': 'ds', 'Value': 'y'}) | |
# 初始化并训练Prophet模型 | |
model = Prophet(yearly_seasonality=True, weekly_seasonality=True) | |
model.fit(df_prophet) | |
# 创建未来预测的DataFrame | |
future = model.make_future_dataframe(periods=h, freq='D') | |
# 进行预测并返回预测值 | |
return model.predict(future)['yhat'].tail(h) | |
def weighted_average_model(train_data, d, D, h): | |
""" | |
训练加权平均模型(SARIMA和Prophet)并进行预测。 | |
权重基于验证集上的MAE反比。 | |
参数: | |
train_data (pd.DataFrame): 训练数据。 | |
d (int): SARIMA非季节差分阶数。 | |
D (int): SARIMA季节差分阶数。 | |
h (int): 预测步长。 | |
返回: | |
tuple: 加权平均预测结果(pd.Series)、SARIMA权重、Prophet权重。 | |
""" | |
validation_data = train_data.tail(28) # 留出最后28天作为验证集 | |
train_for_val = train_data.iloc[:-28] # 用于训练验证模型的子集 | |
if len(train_for_val) < 56: # 确保训练集足够大,至少是验证集的两倍 | |
# 如果数据太少,直接使用Prophet模型,并设置Prophet权重为1.0 | |
prophet_pred_full = prophet_model(train_data, h) | |
return prophet_pred_full, 0.0, 1.0 | |
# 在验证集上生成SARIMA和Prophet预测 | |
sarima_fc_val = sarima_model(train_for_val, d, D, h=28) | |
prophet_fc_val = prophet_model(train_for_val, h=28) | |
# 计算各自在验证集上的MAE | |
sarima_mae = mean_absolute_error(validation_data['Value'], sarima_fc_val) | |
prophet_mae = mean_absolute_error(validation_data['Value'], prophet_fc_val) | |
# 根据MAE计算权重(MAE越小,权重越大) | |
if sarima_mae + prophet_mae == 0: # 避免除以零 | |
sarima_weight, prophet_weight = 0.5, 0.5 | |
else: | |
inv_sum = (1/sarima_mae) + (1/prophet_mae) | |
sarima_weight = (1/sarima_mae) / inv_sum | |
prophet_weight = (1/prophet_mae) / inv_sum | |
# 使用完整训练数据生成最终预测 | |
sarima_pred = sarima_model(train_data, d, D, h=h) | |
prophet_pred = prophet_model(train_data, h=h) | |
# 计算加权平均预测 | |
weighted_avg = sarima_weight * sarima_pred.values + prophet_weight * prophet_pred.values | |
return pd.Series(weighted_avg, index=sarima_pred.index), sarima_weight, prophet_weight | |
def calculate_metrics(actual, predicted): | |
""" | |
计算预测性能指标:MAE, RMSE, MAPE。 | |
参数: | |
actual (pd.Series): 实际值。 | |
predicted (pd.Series): 预测值。 | |
返回: | |
dict: 包含MAE, RMSE, MAPE的字典。 | |
""" | |
# 确保实际值和预测值长度一致 | |
min_len = min(len(actual), len(predicted)) | |
actual = actual.iloc[:min_len] | |
predicted = predicted.iloc[:min_len] | |
# 避免除以零或无穷大 | |
mape = np.mean(np.abs((actual - predicted) / actual.replace(0, np.nan))) * 100 | |
mape = np.nan_to_num(mape, nan=0.0, posinf=0.0, neginf=0.0) # 处理可能出现的NaN/Inf | |
return { | |
'MAE': mean_absolute_error(actual, predicted), | |
'RMSE': np.sqrt(mean_squared_error(actual, predicted)), | |
'MAPE': mape | |
} | |
# ============================================================================= | |
# 创建Gradio界面 | |
# ============================================================================= | |
with gr.Blocks(theme=gr.themes.Soft(), title="药品销量时序预测") as app: | |
gr.Markdown("# 药品销量时序序列预测分析") | |
gr.Markdown("上传包含 'Date' 和 'Value' 列的Excel文件,系统将自动进行数据处理、模型训练、预测和评估。") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_input = gr.File(label="上传Excel文件", file_types=['.xlsx']) | |
run_button = gr.Button("开始分析", variant="primary") | |
summary_output = gr.Textbox(label="分析摘要", lines=7, interactive=False) | |
with gr.Column(scale=2): | |
gr.Markdown("### 四周预测性能指标") | |
metrics_4w_output = gr.DataFrame(headers=["Model", "MAE", "RMSE", "MAPE"], interactive=False) | |
gr.Markdown("### 一周预测性能指标") | |
metrics_1w_output = gr.DataFrame(headers=["Model", "MAE", "RMSE", "MAPE"], interactive=False) | |
with gr.Row(): | |
plot_4w_output = gr.Plot(label="四周预测对比图") | |
plot_1w_output = gr.Plot(label="一周预测对比图") | |
with gr.Row(): | |
plot_stl_output = gr.Plot(label="STL季节性分解图") | |
gr.Markdown("") # 占位符,保持布局 | |
# 设置按钮点击事件 | |
run_button.click( | |
fn=run_analysis, | |
inputs=[file_input], | |
outputs=[summary_output, metrics_4w_output, plot_4w_output, metrics_1w_output, plot_1w_output, plot_stl_output] | |
) | |
# 示例文件,方便用户测试 | |
gr.Examples( | |
[["gmqrkl.xlsx"]], # 请确保 'gmqrkl.xlsx' 文件在Gradio应用运行的环境中可访问 | |
inputs=[file_input], | |
fn=run_analysis, | |
outputs=[summary_output, metrics_4w_output, plot_4w_output, metrics_1w_output, plot_1w_output, plot_stl_output] | |
) | |
# 明确指定 Gradio 监听的地址和端口,以确保在 Hugging Face Spaces 上正确启动 | |
app.launch(server_name="0.0.0.0", server_port=7860) | |