""" Real-time Performance Evaluation API 실시간 예측 성능 평가 및 모니터링 API """ from fastapi import FastAPI, HTTPException, Header, Query from datetime import datetime, timedelta import numpy as np import pandas as pd from typing import Optional, List, Dict, Any import logging from config import INTERNAL_API_KEY logger = logging.getLogger(__name__) # 성능 데이터 저장소 (실제로는 데이터베이스 사용) performance_cache = { "realtime_metrics": {}, "historical_data": [], "station_performance": {}, "alert_thresholds": { "rmse_warning": 30.0, "rmse_critical": 50.0, "accuracy_warning": 80.0, "accuracy_critical": 70.0 } } def verify_internal_api_key(authorization: str = Header(None)): """내부 API 키 검증""" if authorization and authorization.startswith("Bearer "): return authorization == f"Bearer {INTERNAL_API_KEY}" return False def register_performance_routes(app: FastAPI): """성능 평가 API 라우트 등록""" @app.get("/api/performance/realtime", tags=["Performance"]) async def get_realtime_performance( station_id: Optional[str] = Query(None, description="특정 관측소 성능 (전체: None)"), authorization: str = Header(None) ): """실시간 예측 성능 지표 조회""" # 내부 API는 인증 필요, 외부는 읽기 전용 is_internal = verify_internal_api_key(authorization) try: current_time = datetime.now() if station_id: # 특정 관측소 성능 station_metrics = await get_station_performance(station_id) return { "timestamp": current_time.isoformat(), "station_id": station_id, **station_metrics, "data_source": "realtime" } else: # 전체 시스템 성능 overall_metrics = await get_overall_performance() response_data = { "timestamp": current_time.isoformat(), "rmse": overall_metrics["rmse"], "mae": overall_metrics["mae"], "accuracy": overall_metrics["accuracy"], "prediction_count": overall_metrics["prediction_count"], "active_stations": overall_metrics["active_stations"], "data_quality_score": overall_metrics["data_quality_score"], "status": overall_metrics["status"] } # 내부 요청시 추가 정보 제공 if is_internal: response_data.update({ "detailed_metrics": overall_metrics.get("detailed_metrics", {}), "station_breakdown": overall_metrics.get("station_breakdown", {}), "recent_alerts": overall_metrics.get("recent_alerts", []) }) return response_data except Exception as e: logger.error(f"Realtime performance query failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/performance/historical", tags=["Performance"]) async def get_historical_performance( hours: int = Query(24, description="조회할 시간 범위 (시간)"), station_id: Optional[str] = Query(None, description="특정 관측소"), metric: str = Query("rmse", description="성능 지표 (rmse/mae/accuracy)"), authorization: str = Header(None) ): """성능 히스토리 조회""" is_internal = verify_internal_api_key(authorization) try: # 시간 범위 검증 if hours > 168: # 최대 1주일 hours = 168 end_time = datetime.now() start_time = end_time - timedelta(hours=hours) historical_data = await get_performance_history( start_time, end_time, station_id, metric ) # 통계 계산 if historical_data: values = [item[metric] for item in historical_data if item.get(metric) is not None] if values: statistics = { "mean": round(np.mean(values), 2), "std": round(np.std(values), 2), "min": round(min(values), 2), "max": round(max(values), 2), "trend": calculate_trend(values) } else: statistics = None else: statistics = None return { "timestamp": end_time.isoformat(), "query_range": { "start_time": start_time.isoformat(), "end_time": end_time.isoformat(), "hours": hours }, "station_id": station_id, "metric": metric, "data_points": len(historical_data), "data": historical_data[-100:] if not is_internal else historical_data, # 외부는 최근 100개만 "statistics": statistics } except Exception as e: logger.error(f"Historical performance query failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/performance/comparison", tags=["Performance"]) async def compare_station_performance( station_ids: str = Query(..., description="비교할 관측소들 (쉼표 구분)"), metric: str = Query("rmse", description="비교할 성능 지표"), period: str = Query("24h", description="비교 기간 (1h/6h/24h/7d)"), authorization: str = Header(None) ): """관측소별 성능 비교""" is_internal = verify_internal_api_key(authorization) try: # 관측소 목록 파싱 stations = [s.strip() for s in station_ids.split(",")] if len(stations) > 10: # 최대 10개 관측소 stations = stations[:10] # 기간 파싱 period_hours = { "1h": 1, "6h": 6, "24h": 24, "7d": 168 }.get(period, 24) comparison_data = {} for station_id in stations: station_metrics = await get_station_performance_summary( station_id, period_hours, metric ) comparison_data[station_id] = station_metrics # 순위 계산 if metric in ["rmse", "mae"]: # 낮을수록 좋음 sorted_stations = sorted( comparison_data.items(), key=lambda x: x[1].get("current_value", float('inf')) ) else: # accuracy 등 # 높을수록 좋음 sorted_stations = sorted( comparison_data.items(), key=lambda x: x[1].get("current_value", 0), reverse=True ) return { "timestamp": datetime.now().isoformat(), "metric": metric, "period": period, "stations_count": len(stations), "comparison": comparison_data, "ranking": [{"rank": i+1, "station_id": station, "value": data["current_value"]} for i, (station, data) in enumerate(sorted_stations)], "best_performer": sorted_stations[0][0] if sorted_stations else None, "summary": { "best_value": sorted_stations[0][1]["current_value"] if sorted_stations else None, "worst_value": sorted_stations[-1][1]["current_value"] if sorted_stations else None, "average_value": round(np.mean([data["current_value"] for data in comparison_data.values()]), 2) } } except Exception as e: logger.error(f"Station comparison failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/performance/alerts", tags=["Performance"]) async def get_performance_alerts( active_only: bool = Query(True, description="활성 알림만 조회"), hours: int = Query(24, description="조회할 시간 범위"), authorization: str = Header(None) ): """성능 알림 조회""" verify_internal_api_key(authorization) # 내부 API만 접근 가능 try: alerts = await get_current_alerts(active_only, hours) # 알림 분류 critical_alerts = [a for a in alerts if a["severity"] == "critical"] warning_alerts = [a for a in alerts if a["severity"] == "warning"] return { "timestamp": datetime.now().isoformat(), "query_range_hours": hours, "active_only": active_only, "total_alerts": len(alerts), "critical_count": len(critical_alerts), "warning_count": len(warning_alerts), "alerts": alerts, "summary": { "system_status": "critical" if critical_alerts else ("warning" if warning_alerts else "normal"), "requires_attention": len(critical_alerts) > 0, "most_recent": alerts[0] if alerts else None } } except Exception as e: logger.error(f"Performance alerts query failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/performance/update", tags=["Performance"]) async def update_performance_metrics( request_data: Dict[str, Any], authorization: str = Header(None) ): """성능 지표 업데이트 (내부 사용)""" verify_internal_api_key(authorization) # 내부 API만 접근 가능 try: station_id = request_data.get("station_id") predictions = request_data.get("predictions", []) actual_values = request_data.get("actual_values", []) timestamp = request_data.get("timestamp", datetime.now().isoformat()) if not predictions or not actual_values: raise HTTPException(status_code=400, detail="Predictions and actual values required") if len(predictions) != len(actual_values): raise HTTPException(status_code=400, detail="Predictions and actual values must have same length") # 성능 지표 계산 metrics = calculate_performance_metrics(predictions, actual_values) # 성능 데이터 저장 performance_record = { "station_id": station_id, "timestamp": timestamp, "predictions": predictions, "actual_values": actual_values, "metrics": metrics, "data_points": len(predictions) } await save_performance_record(performance_record) # 실시간 캐시 업데이트 performance_cache["realtime_metrics"][station_id] = metrics performance_cache["historical_data"].append(performance_record) # 오래된 데이터 정리 (메모리 관리) if len(performance_cache["historical_data"]) > 1000: performance_cache["historical_data"] = performance_cache["historical_data"][-500:] # 알림 체크 alerts = check_performance_alerts(station_id, metrics) return { "success": True, "timestamp": datetime.now().isoformat(), "station_id": station_id, "metrics": metrics, "data_points": len(predictions), "alerts_triggered": len(alerts), "alerts": alerts } except Exception as e: logger.error(f"Performance update failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # 성능 계산 및 분석 함수들 # ============================================================================ async def get_overall_performance(): """전체 시스템 성능 조회""" # 시연 모드에서는 시뮬레이션 데이터 사용 from internal_api import demo_session, active_issues base_rmse = 18.5 base_mae = 14.2 base_accuracy = 89.2 # 활성 문제들의 영향 계산 rmse_multiplier = 1.0 accuracy_penalty = 0 for issue in active_issues.values(): if issue["type"] == "extreme_weather": rmse_multiplier *= 1.8 accuracy_penalty += 15 elif issue["type"] == "sensor_malfunction": rmse_multiplier *= 1.3 accuracy_penalty += 8 elif issue["type"] == "data_corruption": rmse_multiplier *= 1.5 accuracy_penalty += 12 elif issue["type"] == "network_failure": rmse_multiplier *= 1.2 accuracy_penalty += 5 # 랜덤 변동 추가 rmse_variation = np.random.normal(0, 2) accuracy_variation = np.random.normal(0, 3) current_rmse = max(10, base_rmse * rmse_multiplier + rmse_variation) current_mae = max(8, base_mae * rmse_multiplier * 0.8 + rmse_variation * 0.7) current_accuracy = max(50, min(98, base_accuracy - accuracy_penalty + accuracy_variation)) # 데이터 품질 점수 data_quality = 95 - len(active_issues) * 10 # 상태 결정 if current_rmse > 40 or current_accuracy < 75: status = "critical" elif current_rmse > 25 or current_accuracy < 85: status = "warning" else: status = "good" return { "rmse": round(current_rmse, 1), "mae": round(current_mae, 1), "accuracy": round(current_accuracy, 1), "prediction_count": demo_session.get("total_processed", 0), "active_stations": len(demo_session.get("active_stations", [])), "data_quality_score": round(data_quality, 1), "status": status, "detailed_metrics": { "rmse_trend": "increasing" if len(active_issues) > 0 else "stable", "prediction_latency_ms": np.random.randint(50, 200), "data_freshness_minutes": np.random.randint(1, 10) }, "station_breakdown": await get_all_stations_summary(), "recent_alerts": await get_recent_alerts_summary() } async def get_station_performance(station_id: str): """특정 관측소 성능 조회""" # 기본 성능 + 관측소별 변동 station_seed = hash(station_id) % 1000 np.random.seed(station_seed) base_rmse = 18.5 + np.random.normal(0, 3) base_mae = 14.2 + np.random.normal(0, 2) base_accuracy = 89.2 + np.random.normal(0, 5) # 활성 문제 영향 from internal_api import active_issues for issue in active_issues.values(): if issue.get("station_id") == station_id or not issue.get("station_id"): if issue["type"] == "sensor_malfunction" and issue.get("station_id") == station_id: base_rmse *= 2.0 base_accuracy -= 25 elif issue["type"] == "extreme_weather": base_rmse *= 1.6 base_accuracy -= 12 return { "rmse": round(max(10, base_rmse), 1), "mae": round(max(8, base_mae), 1), "accuracy": round(max(50, min(98, base_accuracy)), 1), "data_points": np.random.randint(50, 200), "last_prediction": (datetime.now() - timedelta(minutes=np.random.randint(1, 15))).isoformat(), "status": "active" if station_id in demo_session.get("active_stations", []) else "inactive" } async def get_performance_history(start_time, end_time, station_id, metric): """성능 히스토리 생성 (시뮬레이션)""" data_points = [] current_time = start_time # 1시간 간격으로 데이터 생성 while current_time <= end_time: # 시간대별 기본값 hour = current_time.hour base_value = { "rmse": 18.5 + 5 * np.sin(2 * np.pi * hour / 24), "mae": 14.2 + 3 * np.sin(2 * np.pi * hour / 24), "accuracy": 89.2 - 8 * np.sin(2 * np.pi * hour / 24) } # 랜덤 변동 variation = np.random.normal(0, 2) value = base_value[metric] + variation # 관측소별 조정 if station_id: station_offset = hash(station_id) % 10 - 5 value += station_offset data_points.append({ "timestamp": current_time.isoformat(), "station_id": station_id, metric: round(value, 1), "data_points": np.random.randint(10, 50) }) current_time += timedelta(hours=1) return data_points async def get_station_performance_summary(station_id, period_hours, metric): """관측소 성능 요약""" # 현재 성능 current_perf = await get_station_performance(station_id) current_value = current_perf[metric] # 기간별 평균 (시뮬레이션) period_variation = np.random.normal(0, 5) period_average = current_value + period_variation # 트렌드 계산 trend_change = np.random.uniform(-10, 10) trend = "improving" if trend_change < -2 else ("degrading" if trend_change > 2 else "stable") return { "station_id": station_id, "current_value": current_value, "period_average": round(period_average, 1), "trend": trend, "trend_change": round(trend_change, 1), "data_points": np.random.randint(20, 100), "last_update": datetime.now().isoformat() } def calculate_performance_metrics(predictions, actual_values): """성능 지표 계산""" predictions = np.array(predictions) actual_values = np.array(actual_values) # RMSE rmse = np.sqrt(np.mean((predictions - actual_values) ** 2)) # MAE mae = np.mean(np.abs(predictions - actual_values)) # 정확도 (95% 신뢰구간 내 예측 비율) errors = np.abs(predictions - actual_values) threshold = np.percentile(errors, 95) accuracy = np.mean(errors <= threshold) * 100 # 추가 지표 mape = np.mean(np.abs((actual_values - predictions) / actual_values)) * 100 r2 = 1 - (np.sum((actual_values - predictions) ** 2) / np.sum((actual_values - np.mean(actual_values)) ** 2)) return { "rmse": round(rmse, 2), "mae": round(mae, 2), "accuracy": round(accuracy, 1), "mape": round(mape, 2), "r2_score": round(r2, 3), "data_points": len(predictions) } def calculate_trend(values): """트렌드 계산""" if len(values) < 3: return "insufficient_data" # 선형 회귀로 기울기 계산 x = np.arange(len(values)) slope = np.polyfit(x, values, 1)[0] if slope > 0.5: return "increasing" elif slope < -0.5: return "decreasing" else: return "stable" def check_performance_alerts(station_id, metrics): """성능 알림 체크""" alerts = [] thresholds = performance_cache["alert_thresholds"] # RMSE 체크 rmse = metrics["rmse"] if rmse > thresholds["rmse_critical"]: alerts.append({ "type": "rmse_critical", "severity": "critical", "message": f"Critical RMSE level: {rmse} cm", "station_id": station_id, "threshold": thresholds["rmse_critical"], "current_value": rmse, "timestamp": datetime.now().isoformat() }) elif rmse > thresholds["rmse_warning"]: alerts.append({ "type": "rmse_warning", "severity": "warning", "message": f"High RMSE level: {rmse} cm", "station_id": station_id, "threshold": thresholds["rmse_warning"], "current_value": rmse, "timestamp": datetime.now().isoformat() }) # 정확도 체크 accuracy = metrics["accuracy"] if accuracy < thresholds["accuracy_critical"]: alerts.append({ "type": "accuracy_critical", "severity": "critical", "message": f"Critical accuracy drop: {accuracy}%", "station_id": station_id, "threshold": thresholds["accuracy_critical"], "current_value": accuracy, "timestamp": datetime.now().isoformat() }) elif accuracy < thresholds["accuracy_warning"]: alerts.append({ "type": "accuracy_warning", "severity": "warning", "message": f"Low accuracy: {accuracy}%", "station_id": station_id, "threshold": thresholds["accuracy_warning"], "current_value": accuracy, "timestamp": datetime.now().isoformat() }) return alerts async def get_current_alerts(active_only, hours): """현재 알림 조회 (시뮬레이션)""" from internal_api import active_issues, demo_session alerts = [] # 활성 문제들을 알림으로 변환 for issue_id, issue in active_issues.items(): alerts.append({ "id": issue_id, "type": f"{issue['type']}_alert", "severity": "critical" if issue["type"] in ["extreme_weather", "sensor_malfunction"] else "warning", "message": f"{issue['type'].replace('_', ' ').title()} detected", "station_id": issue.get("station_id"), "timestamp": issue["start_time"].isoformat(), "duration_minutes": (datetime.now() - issue["start_time"]).total_seconds() / 60, "active": True }) # 추가 성능 알림 시뮬레이션 if demo_session.get("active"): current_perf = await get_overall_performance() if current_perf["rmse"] > 30: alerts.append({ "id": "perf_rmse_high", "type": "performance_degradation", "severity": "warning", "message": f"High system RMSE: {current_perf['rmse']} cm", "timestamp": (datetime.now() - timedelta(minutes=5)).isoformat(), "duration_minutes": 5, "active": True }) # 정렬 (최신순) alerts.sort(key=lambda x: x["timestamp"], reverse=True) return alerts async def get_all_stations_summary(): """모든 관측소 성능 요약""" from internal_api import demo_session stations = demo_session.get("active_stations", ["DT_0001", "DT_0002"]) summary = {} for station_id in stations: perf = await get_station_performance(station_id) summary[station_id] = { "rmse": perf["rmse"], "accuracy": perf["accuracy"], "status": perf["status"] } return summary async def get_recent_alerts_summary(): """최근 알림 요약""" alerts = await get_current_alerts(True, 1) # 최근 1시간 return { "total_count": len(alerts), "critical_count": sum(1 for a in alerts if a["severity"] == "critical"), "most_recent": alerts[0] if alerts else None } async def save_performance_record(performance_record): """성능 기록 저장 (실제로는 데이터베이스 사용)""" # 여기서는 메모리에만 저장 (시뮬레이션) performance_cache["historical_data"].append(performance_record) # 실제 구현에서는 Supabase나 다른 DB에 저장 # await supabase.table("performance_metrics").insert(performance_record) pass logger.info("Performance API module loaded successfully")