| |
|
|
| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| from sklearn.linear_model import LinearRegression |
| import json |
| import tempfile |
| import os |
|
|
| |
| def detect_trends_with_forecast(process_name, datetime_str, window_minutes, forecast_minutes, csv_file, excel_file): |
| try: |
| csv_path = csv_file |
| excel_path = excel_file |
|
|
| |
| df = pd.read_csv(csv_path, header=[0, 1, 2]) |
| timestamp_col = df.iloc[:, 0] |
| df = df.drop(df.columns[0], axis=1) |
| df.insert(0, "timestamp", timestamp_col) |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
|
|
| |
| thresholds_df = pd.read_excel(excel_path) |
| thresholds_df["Important"] = thresholds_df["Important"].astype(str).str.upper().map({"TRUE": True, "FALSE": False}) |
| for col in ["LL", "L", "H", "HH"]: |
| if col in thresholds_df.columns: |
| thresholds_df[col] = pd.to_numeric(thresholds_df[col], errors="coerce") |
|
|
| |
| target_time = pd.to_datetime(datetime_str) |
| start_time = target_time - pd.Timedelta(minutes=window_minutes) |
| end_time = target_time |
| df_window = df[(df["timestamp"] >= start_time) & (df["timestamp"] <= end_time)] |
|
|
| if df_window.empty: |
| return None, "⚠ 指定時間幅にデータなし", None |
|
|
| |
| interval = df_window["timestamp"].diff().median() |
| if pd.isna(interval) or interval == pd.Timedelta(0): |
| return None, "⚠ サンプリング間隔を検出できません", None |
| interval_minutes = interval.total_seconds() / 60 |
|
|
| |
| proc_thresholds = thresholds_df[(thresholds_df["ProcessNo_ProcessName"] == process_name) & |
| (thresholds_df["Important"] == True)] |
| if proc_thresholds.empty: |
| return None, f"⚠ プロセス {process_name} の重要項目なし", None |
|
|
| results = [] |
| for _, thr in proc_thresholds.iterrows(): |
| col_tuple = (thr["ColumnID"], thr["ItemName"], thr["ProcessNo_ProcessName"]) |
| if col_tuple not in df.columns: |
| continue |
|
|
| series = df_window[col_tuple].dropna() |
| if len(series) < 3: |
| continue |
|
|
| |
| x = np.arange(len(series)).reshape(-1, 1) |
| y = series.values.reshape(-1, 1) |
| model = LinearRegression().fit(x, y) |
| slope = model.coef_[0][0] |
|
|
| last_val = series.iloc[-1] |
| n = len(series) |
|
|
| |
| forecast_steps = max(1, int(round(forecast_minutes / interval_minutes))) |
| forecast_index = n + forecast_steps |
| forecast_val = model.predict([[forecast_index]])[0][0] |
| forecast_time = target_time + pd.Timedelta(minutes=forecast_minutes) |
|
|
| l, ll, h, hh = thr.get("L"), thr.get("LL"), thr.get("H"), thr.get("HH") |
|
|
| |
| status = "安定" |
| if slope < 0 and pd.notna(ll): |
| if last_val > ll: |
| status = "LL接近下降傾向" |
| elif last_val <= ll: |
| status = "LL逸脱下降傾向" |
| if slope > 0 and pd.notna(hh): |
| if last_val < hh: |
| status = "HH接近上昇傾向" |
| elif last_val >= hh: |
| status = "HH逸脱上昇傾向" |
|
|
| |
| forecast_status = "安定" |
| if pd.notna(ll) and forecast_val <= ll: |
| forecast_status = "LL逸脱予測" |
| elif pd.notna(hh) and forecast_val >= hh: |
| forecast_status = "HH逸脱予測" |
|
|
| results.append({ |
| "ItemName": thr["ItemName"], |
| "傾向": status, |
| "傾き": float(round(slope, 4)), |
| "最終値": float(round(float(last_val), 3)) if pd.notna(last_val) else None, |
| "予測値": float(round(float(forecast_val), 3)), |
| "予測時刻": str(forecast_time), |
| "予測傾向": forecast_status, |
| "サンプリング間隔(分)": float(interval_minutes), |
| "LL": None if pd.isna(ll) else float(ll), |
| "L": None if pd.isna(l) else float(l), |
| "H": None if pd.isna(h) else float(h), |
| "HH": None if pd.isna(hh) else float(hh) |
| }) |
|
|
| result_df = pd.DataFrame(results) |
|
|
| |
| result_json = json.dumps(results, ensure_ascii=False, indent=2) |
| fd, tmp_path = tempfile.mkstemp(suffix=".json", prefix="trend_result_") |
| os.close(fd) |
| with open(tmp_path, "w", encoding="utf-8") as f: |
| f.write(result_json) |
|
|
| summary = f"✅ {process_name} の傾向検出+未来予測完了({start_time}~{end_time}, 予測+{forecast_minutes}分)" |
| return result_df, summary, tmp_path |
|
|
| except Exception as e: |
| return None, f"❌ エラー: {str(e)}", None |
|
|
|
|
| |
| with gr.Blocks( |
| css=".gradio-container {overflow-y: auto !important; height: 100vh;}" |
| ) as demo: |
| gr.Markdown("## 傾向検出アプリ(重要項目ごとの詳細+閾値予測)") |
|
|
| with gr.Row(): |
| csv_input = gr.File(label="CSVファイルをアップロード", type="filepath") |
| excel_input = gr.File(label="閾値テーブル (Excel)", type="filepath") |
|
|
| with gr.Row(): |
| process_name = gr.Textbox(label="プロセス名", placeholder="例: E018-A012_除害RO") |
| datetime_str = gr.Textbox(label="日時 (例: 2025/8/1 1:05)") |
| with gr.Row(): |
| window_minutes = gr.Number(label="参照時間幅(分)", value=60) |
| forecast_minutes = gr.Number(label="未来予測時間幅(分)", value=60) |
|
|
| run_btn = gr.Button("実行") |
|
|
| with gr.Row(): |
| result_table = gr.Dataframe(label="傾向+未来予測結果", wrap=True) |
| summary_output = gr.Textbox(label="サマリー") |
| json_download = gr.File(label="JSON結果ダウンロード") |
|
|
| run_btn.click( |
| fn=detect_trends_with_forecast, |
| inputs=[process_name, datetime_str, window_minutes, forecast_minutes, csv_input, excel_input], |
| outputs=[result_table, summary_output, json_download] |
| ) |
|
|
| if __name__ == "__main__": |
| |
| demo.launch( |
| server_name="0.0.0.0", |
| mcp_server=True, |
| share=False, |
| ssr_mode=False |
| ) |
|
|