Spaces:
Sleeping
Sleeping
""" | |
Real-time Performance Evaluation API | |
실시간 예측 성능 평가 및 모니터링 API | |
""" | |
from fastapi import FastAPI, HTTPException, Header, Query | |
from datetime import datetime, timedelta | |
import numpy as np | |
import pandas as pd | |
from typing import Optional, List, Dict, Any | |
import logging | |
from config import INTERNAL_API_KEY | |
logger = logging.getLogger(__name__) | |
# 성능 데이터 저장소 (실제로는 데이터베이스 사용) | |
performance_cache = { | |
"realtime_metrics": {}, | |
"historical_data": [], | |
"station_performance": {}, | |
"alert_thresholds": { | |
"rmse_warning": 30.0, | |
"rmse_critical": 50.0, | |
"accuracy_warning": 80.0, | |
"accuracy_critical": 70.0 | |
} | |
} | |
def verify_internal_api_key(authorization: str = Header(None)): | |
"""내부 API 키 검증""" | |
if authorization and authorization.startswith("Bearer "): | |
return authorization == f"Bearer {INTERNAL_API_KEY}" | |
return False | |
def register_performance_routes(app: FastAPI): | |
"""성능 평가 API 라우트 등록""" | |
async def get_realtime_performance( | |
station_id: Optional[str] = Query(None, description="특정 관측소 성능 (전체: None)"), | |
authorization: str = Header(None) | |
): | |
"""실시간 예측 성능 지표 조회""" | |
# 내부 API는 인증 필요, 외부는 읽기 전용 | |
is_internal = verify_internal_api_key(authorization) | |
try: | |
current_time = datetime.now() | |
if station_id: | |
# 특정 관측소 성능 | |
station_metrics = await get_station_performance(station_id) | |
return { | |
"timestamp": current_time.isoformat(), | |
"station_id": station_id, | |
**station_metrics, | |
"data_source": "realtime" | |
} | |
else: | |
# 전체 시스템 성능 | |
overall_metrics = await get_overall_performance() | |
response_data = { | |
"timestamp": current_time.isoformat(), | |
"rmse": overall_metrics["rmse"], | |
"mae": overall_metrics["mae"], | |
"accuracy": overall_metrics["accuracy"], | |
"prediction_count": overall_metrics["prediction_count"], | |
"active_stations": overall_metrics["active_stations"], | |
"data_quality_score": overall_metrics["data_quality_score"], | |
"status": overall_metrics["status"] | |
} | |
# 내부 요청시 추가 정보 제공 | |
if is_internal: | |
response_data.update({ | |
"detailed_metrics": overall_metrics.get("detailed_metrics", {}), | |
"station_breakdown": overall_metrics.get("station_breakdown", {}), | |
"recent_alerts": overall_metrics.get("recent_alerts", []) | |
}) | |
return response_data | |
except Exception as e: | |
logger.error(f"Realtime performance query failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_historical_performance( | |
hours: int = Query(24, description="조회할 시간 범위 (시간)"), | |
station_id: Optional[str] = Query(None, description="특정 관측소"), | |
metric: str = Query("rmse", description="성능 지표 (rmse/mae/accuracy)"), | |
authorization: str = Header(None) | |
): | |
"""성능 히스토리 조회""" | |
is_internal = verify_internal_api_key(authorization) | |
try: | |
# 시간 범위 검증 | |
if hours > 168: # 최대 1주일 | |
hours = 168 | |
end_time = datetime.now() | |
start_time = end_time - timedelta(hours=hours) | |
historical_data = await get_performance_history( | |
start_time, end_time, station_id, metric | |
) | |
# 통계 계산 | |
if historical_data: | |
values = [item[metric] for item in historical_data if item.get(metric) is not None] | |
if values: | |
statistics = { | |
"mean": round(np.mean(values), 2), | |
"std": round(np.std(values), 2), | |
"min": round(min(values), 2), | |
"max": round(max(values), 2), | |
"trend": calculate_trend(values) | |
} | |
else: | |
statistics = None | |
else: | |
statistics = None | |
return { | |
"timestamp": end_time.isoformat(), | |
"query_range": { | |
"start_time": start_time.isoformat(), | |
"end_time": end_time.isoformat(), | |
"hours": hours | |
}, | |
"station_id": station_id, | |
"metric": metric, | |
"data_points": len(historical_data), | |
"data": historical_data[-100:] if not is_internal else historical_data, # 외부는 최근 100개만 | |
"statistics": statistics | |
} | |
except Exception as e: | |
logger.error(f"Historical performance query failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def compare_station_performance( | |
station_ids: str = Query(..., description="비교할 관측소들 (쉼표 구분)"), | |
metric: str = Query("rmse", description="비교할 성능 지표"), | |
period: str = Query("24h", description="비교 기간 (1h/6h/24h/7d)"), | |
authorization: str = Header(None) | |
): | |
"""관측소별 성능 비교""" | |
is_internal = verify_internal_api_key(authorization) | |
try: | |
# 관측소 목록 파싱 | |
stations = [s.strip() for s in station_ids.split(",")] | |
if len(stations) > 10: # 최대 10개 관측소 | |
stations = stations[:10] | |
# 기간 파싱 | |
period_hours = { | |
"1h": 1, "6h": 6, "24h": 24, "7d": 168 | |
}.get(period, 24) | |
comparison_data = {} | |
for station_id in stations: | |
station_metrics = await get_station_performance_summary( | |
station_id, period_hours, metric | |
) | |
comparison_data[station_id] = station_metrics | |
# 순위 계산 | |
if metric in ["rmse", "mae"]: | |
# 낮을수록 좋음 | |
sorted_stations = sorted( | |
comparison_data.items(), | |
key=lambda x: x[1].get("current_value", float('inf')) | |
) | |
else: # accuracy 등 | |
# 높을수록 좋음 | |
sorted_stations = sorted( | |
comparison_data.items(), | |
key=lambda x: x[1].get("current_value", 0), | |
reverse=True | |
) | |
return { | |
"timestamp": datetime.now().isoformat(), | |
"metric": metric, | |
"period": period, | |
"stations_count": len(stations), | |
"comparison": comparison_data, | |
"ranking": [{"rank": i+1, "station_id": station, "value": data["current_value"]} | |
for i, (station, data) in enumerate(sorted_stations)], | |
"best_performer": sorted_stations[0][0] if sorted_stations else None, | |
"summary": { | |
"best_value": sorted_stations[0][1]["current_value"] if sorted_stations else None, | |
"worst_value": sorted_stations[-1][1]["current_value"] if sorted_stations else None, | |
"average_value": round(np.mean([data["current_value"] for data in comparison_data.values()]), 2) | |
} | |
} | |
except Exception as e: | |
logger.error(f"Station comparison failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_performance_alerts( | |
active_only: bool = Query(True, description="활성 알림만 조회"), | |
hours: int = Query(24, description="조회할 시간 범위"), | |
authorization: str = Header(None) | |
): | |
"""성능 알림 조회""" | |
verify_internal_api_key(authorization) # 내부 API만 접근 가능 | |
try: | |
alerts = await get_current_alerts(active_only, hours) | |
# 알림 분류 | |
critical_alerts = [a for a in alerts if a["severity"] == "critical"] | |
warning_alerts = [a for a in alerts if a["severity"] == "warning"] | |
return { | |
"timestamp": datetime.now().isoformat(), | |
"query_range_hours": hours, | |
"active_only": active_only, | |
"total_alerts": len(alerts), | |
"critical_count": len(critical_alerts), | |
"warning_count": len(warning_alerts), | |
"alerts": alerts, | |
"summary": { | |
"system_status": "critical" if critical_alerts else ("warning" if warning_alerts else "normal"), | |
"requires_attention": len(critical_alerts) > 0, | |
"most_recent": alerts[0] if alerts else None | |
} | |
} | |
except Exception as e: | |
logger.error(f"Performance alerts query failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def update_performance_metrics( | |
request_data: Dict[str, Any], | |
authorization: str = Header(None) | |
): | |
"""성능 지표 업데이트 (내부 사용)""" | |
verify_internal_api_key(authorization) # 내부 API만 접근 가능 | |
try: | |
station_id = request_data.get("station_id") | |
predictions = request_data.get("predictions", []) | |
actual_values = request_data.get("actual_values", []) | |
timestamp = request_data.get("timestamp", datetime.now().isoformat()) | |
if not predictions or not actual_values: | |
raise HTTPException(status_code=400, detail="Predictions and actual values required") | |
if len(predictions) != len(actual_values): | |
raise HTTPException(status_code=400, detail="Predictions and actual values must have same length") | |
# 성능 지표 계산 | |
metrics = calculate_performance_metrics(predictions, actual_values) | |
# 성능 데이터 저장 | |
performance_record = { | |
"station_id": station_id, | |
"timestamp": timestamp, | |
"predictions": predictions, | |
"actual_values": actual_values, | |
"metrics": metrics, | |
"data_points": len(predictions) | |
} | |
await save_performance_record(performance_record) | |
# 실시간 캐시 업데이트 | |
performance_cache["realtime_metrics"][station_id] = metrics | |
performance_cache["historical_data"].append(performance_record) | |
# 오래된 데이터 정리 (메모리 관리) | |
if len(performance_cache["historical_data"]) > 1000: | |
performance_cache["historical_data"] = performance_cache["historical_data"][-500:] | |
# 알림 체크 | |
alerts = check_performance_alerts(station_id, metrics) | |
return { | |
"success": True, | |
"timestamp": datetime.now().isoformat(), | |
"station_id": station_id, | |
"metrics": metrics, | |
"data_points": len(predictions), | |
"alerts_triggered": len(alerts), | |
"alerts": alerts | |
} | |
except Exception as e: | |
logger.error(f"Performance update failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
# ============================================================================ | |
# 성능 계산 및 분석 함수들 | |
# ============================================================================ | |
async def get_overall_performance(): | |
"""전체 시스템 성능 조회""" | |
# 시연 모드에서는 시뮬레이션 데이터 사용 | |
from internal_api import demo_session, active_issues | |
base_rmse = 18.5 | |
base_mae = 14.2 | |
base_accuracy = 89.2 | |
# 활성 문제들의 영향 계산 | |
rmse_multiplier = 1.0 | |
accuracy_penalty = 0 | |
for issue in active_issues.values(): | |
if issue["type"] == "extreme_weather": | |
rmse_multiplier *= 1.8 | |
accuracy_penalty += 15 | |
elif issue["type"] == "sensor_malfunction": | |
rmse_multiplier *= 1.3 | |
accuracy_penalty += 8 | |
elif issue["type"] == "data_corruption": | |
rmse_multiplier *= 1.5 | |
accuracy_penalty += 12 | |
elif issue["type"] == "network_failure": | |
rmse_multiplier *= 1.2 | |
accuracy_penalty += 5 | |
# 랜덤 변동 추가 | |
rmse_variation = np.random.normal(0, 2) | |
accuracy_variation = np.random.normal(0, 3) | |
current_rmse = max(10, base_rmse * rmse_multiplier + rmse_variation) | |
current_mae = max(8, base_mae * rmse_multiplier * 0.8 + rmse_variation * 0.7) | |
current_accuracy = max(50, min(98, base_accuracy - accuracy_penalty + accuracy_variation)) | |
# 데이터 품질 점수 | |
data_quality = 95 - len(active_issues) * 10 | |
# 상태 결정 | |
if current_rmse > 40 or current_accuracy < 75: | |
status = "critical" | |
elif current_rmse > 25 or current_accuracy < 85: | |
status = "warning" | |
else: | |
status = "good" | |
return { | |
"rmse": round(current_rmse, 1), | |
"mae": round(current_mae, 1), | |
"accuracy": round(current_accuracy, 1), | |
"prediction_count": demo_session.get("total_processed", 0), | |
"active_stations": len(demo_session.get("active_stations", [])), | |
"data_quality_score": round(data_quality, 1), | |
"status": status, | |
"detailed_metrics": { | |
"rmse_trend": "increasing" if len(active_issues) > 0 else "stable", | |
"prediction_latency_ms": np.random.randint(50, 200), | |
"data_freshness_minutes": np.random.randint(1, 10) | |
}, | |
"station_breakdown": await get_all_stations_summary(), | |
"recent_alerts": await get_recent_alerts_summary() | |
} | |
async def get_station_performance(station_id: str): | |
"""특정 관측소 성능 조회""" | |
# 기본 성능 + 관측소별 변동 | |
station_seed = hash(station_id) % 1000 | |
np.random.seed(station_seed) | |
base_rmse = 18.5 + np.random.normal(0, 3) | |
base_mae = 14.2 + np.random.normal(0, 2) | |
base_accuracy = 89.2 + np.random.normal(0, 5) | |
# 활성 문제 영향 | |
from internal_api import active_issues | |
for issue in active_issues.values(): | |
if issue.get("station_id") == station_id or not issue.get("station_id"): | |
if issue["type"] == "sensor_malfunction" and issue.get("station_id") == station_id: | |
base_rmse *= 2.0 | |
base_accuracy -= 25 | |
elif issue["type"] == "extreme_weather": | |
base_rmse *= 1.6 | |
base_accuracy -= 12 | |
return { | |
"rmse": round(max(10, base_rmse), 1), | |
"mae": round(max(8, base_mae), 1), | |
"accuracy": round(max(50, min(98, base_accuracy)), 1), | |
"data_points": np.random.randint(50, 200), | |
"last_prediction": (datetime.now() - timedelta(minutes=np.random.randint(1, 15))).isoformat(), | |
"status": "active" if station_id in demo_session.get("active_stations", []) else "inactive" | |
} | |
async def get_performance_history(start_time, end_time, station_id, metric): | |
"""성능 히스토리 생성 (시뮬레이션)""" | |
data_points = [] | |
current_time = start_time | |
# 1시간 간격으로 데이터 생성 | |
while current_time <= end_time: | |
# 시간대별 기본값 | |
hour = current_time.hour | |
base_value = { | |
"rmse": 18.5 + 5 * np.sin(2 * np.pi * hour / 24), | |
"mae": 14.2 + 3 * np.sin(2 * np.pi * hour / 24), | |
"accuracy": 89.2 - 8 * np.sin(2 * np.pi * hour / 24) | |
} | |
# 랜덤 변동 | |
variation = np.random.normal(0, 2) | |
value = base_value[metric] + variation | |
# 관측소별 조정 | |
if station_id: | |
station_offset = hash(station_id) % 10 - 5 | |
value += station_offset | |
data_points.append({ | |
"timestamp": current_time.isoformat(), | |
"station_id": station_id, | |
metric: round(value, 1), | |
"data_points": np.random.randint(10, 50) | |
}) | |
current_time += timedelta(hours=1) | |
return data_points | |
async def get_station_performance_summary(station_id, period_hours, metric): | |
"""관측소 성능 요약""" | |
# 현재 성능 | |
current_perf = await get_station_performance(station_id) | |
current_value = current_perf[metric] | |
# 기간별 평균 (시뮬레이션) | |
period_variation = np.random.normal(0, 5) | |
period_average = current_value + period_variation | |
# 트렌드 계산 | |
trend_change = np.random.uniform(-10, 10) | |
trend = "improving" if trend_change < -2 else ("degrading" if trend_change > 2 else "stable") | |
return { | |
"station_id": station_id, | |
"current_value": current_value, | |
"period_average": round(period_average, 1), | |
"trend": trend, | |
"trend_change": round(trend_change, 1), | |
"data_points": np.random.randint(20, 100), | |
"last_update": datetime.now().isoformat() | |
} | |
def calculate_performance_metrics(predictions, actual_values): | |
"""성능 지표 계산""" | |
predictions = np.array(predictions) | |
actual_values = np.array(actual_values) | |
# RMSE | |
rmse = np.sqrt(np.mean((predictions - actual_values) ** 2)) | |
# MAE | |
mae = np.mean(np.abs(predictions - actual_values)) | |
# 정확도 (95% 신뢰구간 내 예측 비율) | |
errors = np.abs(predictions - actual_values) | |
threshold = np.percentile(errors, 95) | |
accuracy = np.mean(errors <= threshold) * 100 | |
# 추가 지표 | |
mape = np.mean(np.abs((actual_values - predictions) / actual_values)) * 100 | |
r2 = 1 - (np.sum((actual_values - predictions) ** 2) / np.sum((actual_values - np.mean(actual_values)) ** 2)) | |
return { | |
"rmse": round(rmse, 2), | |
"mae": round(mae, 2), | |
"accuracy": round(accuracy, 1), | |
"mape": round(mape, 2), | |
"r2_score": round(r2, 3), | |
"data_points": len(predictions) | |
} | |
def calculate_trend(values): | |
"""트렌드 계산""" | |
if len(values) < 3: | |
return "insufficient_data" | |
# 선형 회귀로 기울기 계산 | |
x = np.arange(len(values)) | |
slope = np.polyfit(x, values, 1)[0] | |
if slope > 0.5: | |
return "increasing" | |
elif slope < -0.5: | |
return "decreasing" | |
else: | |
return "stable" | |
def check_performance_alerts(station_id, metrics): | |
"""성능 알림 체크""" | |
alerts = [] | |
thresholds = performance_cache["alert_thresholds"] | |
# RMSE 체크 | |
rmse = metrics["rmse"] | |
if rmse > thresholds["rmse_critical"]: | |
alerts.append({ | |
"type": "rmse_critical", | |
"severity": "critical", | |
"message": f"Critical RMSE level: {rmse} cm", | |
"station_id": station_id, | |
"threshold": thresholds["rmse_critical"], | |
"current_value": rmse, | |
"timestamp": datetime.now().isoformat() | |
}) | |
elif rmse > thresholds["rmse_warning"]: | |
alerts.append({ | |
"type": "rmse_warning", | |
"severity": "warning", | |
"message": f"High RMSE level: {rmse} cm", | |
"station_id": station_id, | |
"threshold": thresholds["rmse_warning"], | |
"current_value": rmse, | |
"timestamp": datetime.now().isoformat() | |
}) | |
# 정확도 체크 | |
accuracy = metrics["accuracy"] | |
if accuracy < thresholds["accuracy_critical"]: | |
alerts.append({ | |
"type": "accuracy_critical", | |
"severity": "critical", | |
"message": f"Critical accuracy drop: {accuracy}%", | |
"station_id": station_id, | |
"threshold": thresholds["accuracy_critical"], | |
"current_value": accuracy, | |
"timestamp": datetime.now().isoformat() | |
}) | |
elif accuracy < thresholds["accuracy_warning"]: | |
alerts.append({ | |
"type": "accuracy_warning", | |
"severity": "warning", | |
"message": f"Low accuracy: {accuracy}%", | |
"station_id": station_id, | |
"threshold": thresholds["accuracy_warning"], | |
"current_value": accuracy, | |
"timestamp": datetime.now().isoformat() | |
}) | |
return alerts | |
async def get_current_alerts(active_only, hours): | |
"""현재 알림 조회 (시뮬레이션)""" | |
from internal_api import active_issues, demo_session | |
alerts = [] | |
# 활성 문제들을 알림으로 변환 | |
for issue_id, issue in active_issues.items(): | |
alerts.append({ | |
"id": issue_id, | |
"type": f"{issue['type']}_alert", | |
"severity": "critical" if issue["type"] in ["extreme_weather", "sensor_malfunction"] else "warning", | |
"message": f"{issue['type'].replace('_', ' ').title()} detected", | |
"station_id": issue.get("station_id"), | |
"timestamp": issue["start_time"].isoformat(), | |
"duration_minutes": (datetime.now() - issue["start_time"]).total_seconds() / 60, | |
"active": True | |
}) | |
# 추가 성능 알림 시뮬레이션 | |
if demo_session.get("active"): | |
current_perf = await get_overall_performance() | |
if current_perf["rmse"] > 30: | |
alerts.append({ | |
"id": "perf_rmse_high", | |
"type": "performance_degradation", | |
"severity": "warning", | |
"message": f"High system RMSE: {current_perf['rmse']} cm", | |
"timestamp": (datetime.now() - timedelta(minutes=5)).isoformat(), | |
"duration_minutes": 5, | |
"active": True | |
}) | |
# 정렬 (최신순) | |
alerts.sort(key=lambda x: x["timestamp"], reverse=True) | |
return alerts | |
async def get_all_stations_summary(): | |
"""모든 관측소 성능 요약""" | |
from internal_api import demo_session | |
stations = demo_session.get("active_stations", ["DT_0001", "DT_0002"]) | |
summary = {} | |
for station_id in stations: | |
perf = await get_station_performance(station_id) | |
summary[station_id] = { | |
"rmse": perf["rmse"], | |
"accuracy": perf["accuracy"], | |
"status": perf["status"] | |
} | |
return summary | |
async def get_recent_alerts_summary(): | |
"""최근 알림 요약""" | |
alerts = await get_current_alerts(True, 1) # 최근 1시간 | |
return { | |
"total_count": len(alerts), | |
"critical_count": sum(1 for a in alerts if a["severity"] == "critical"), | |
"most_recent": alerts[0] if alerts else None | |
} | |
async def save_performance_record(performance_record): | |
"""성능 기록 저장 (실제로는 데이터베이스 사용)""" | |
# 여기서는 메모리에만 저장 (시뮬레이션) | |
performance_cache["historical_data"].append(performance_record) | |
# 실제 구현에서는 Supabase나 다른 DB에 저장 | |
# await supabase.table("performance_metrics").insert(performance_record) | |
pass | |
logger.info("Performance API module loaded successfully") |