""" API 유틸리티 모듈 실무에서 바로 사용 가능한 조위 예측 API 함수들 """ from datetime import datetime, timedelta from typing import Dict, List, Optional, Union import pytz from supabase_utils import get_supabase_client from config import STATION_NAMES # API 응답 표준 포맷 def create_api_response(success: bool, data: any = None, error: str = None, meta: Dict = None) -> Dict: """표준 API 응답 포맷 생성""" response = { "success": success, "timestamp": datetime.now(pytz.timezone('Asia/Seoul')).isoformat(), } if meta: response["meta"] = meta if success: response["data"] = data else: response["error"] = error or "Unknown error" return response def get_station_meta(station_id: str) -> Dict: """관측소 메타 정보 반환""" # 관측소 좌표 정보 (실제 좌표) STATION_COORDS = { "DT_0001": {"lat": 37.452, "lon": 126.592}, "DT_0002": {"lat": 36.9669, "lon": 126.823}, "DT_0003": {"lat": 35.4262, "lon": 126.421}, "DT_0008": {"lat": 37.1922, "lon": 126.647}, "DT_0017": {"lat": 37.0075, "lon": 126.353}, "DT_0018": {"lat": 35.9755, "lon": 126.563}, "DT_0024": {"lat": 36.0069, "lon": 126.688}, "DT_0025": {"lat": 36.4064, "lon": 126.486}, "DT_0037": {"lat": 36.1173, "lon": 125.985}, "DT_0043": {"lat": 37.2394, "lon": 126.429}, "DT_0050": {"lat": 36.9131, "lon": 126.239}, "DT_0051": {"lat": 36.1289, "lon": 126.495}, "DT_0052": {"lat": 37.3382, "lon": 126.586}, "DT_0065": {"lat": 37.2394, "lon": 126.155}, "DT_0066": {"lat": 35.6858, "lon": 126.334}, "DT_0067": {"lat": 36.6737, "lon": 126.132}, "DT_0068": {"lat": 35.6181, "lon": 126.302}, } coords = STATION_COORDS.get(station_id, {"lat": 0, "lon": 0}) return { "obs_post_id": station_id, "obs_post_name": STATION_NAMES.get(station_id, "Unknown"), "obs_lat": str(coords["lat"]), "obs_lon": str(coords["lon"]), "data_type": "prediction" # 예측 데이터임을 명시 } # 1. 현재/미래 조위 조회 (조화 예측 폴백 포함) def api_get_tide_level( station_id: str, target_time: Optional[str] = None, use_harmonic_fallback: bool = True ) -> Dict: """ 특정 시간의 조위 정보 조회 Args: station_id: 관측소 ID target_time: 조회 시간 (ISO format, None이면 현재 시간) use_harmonic_fallback: 최종 예측이 없을 때 조화 예측 사용 여부 Returns: API 응답 (최종 예측 우선, 없으면 조화 예측) """ supabase = get_supabase_client() if not supabase: return create_api_response(False, error="Database connection failed") try: # 대상 시간 파싱 kst = pytz.timezone('Asia/Seoul') if target_time: query_time = datetime.fromisoformat(target_time.replace('Z', '+00:00')) if query_time.tzinfo is None: query_time = kst.localize(query_time) else: query_time = datetime.now(kst) # UTC로 변환하여 쿼리 (중요!) query_time_utc = query_time.astimezone(pytz.UTC) query_str = query_time_utc.strftime('%Y-%m-%dT%H:%M:%S') # 가장 가까운 5분 단위로 반올림 minutes = query_time.minute rounded_minutes = round(minutes / 5) * 5 if rounded_minutes == 60: query_time_rounded = query_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) else: query_time_rounded = query_time.replace(minute=rounded_minutes, second=0, microsecond=0) query_time_rounded_utc = query_time_rounded.astimezone(pytz.UTC) # 1차: 정확한 시간 매칭 시도 result = supabase.table('tide_predictions')\ .select('*')\ .eq('station_id', station_id)\ .eq('predicted_at', query_time_rounded_utc.strftime('%Y-%m-%dT%H:%M:%S'))\ .execute() # 2차: 정확한 매칭이 없으면 전후 5분 범위에서 가장 가까운 것 if not result.data: start_time = (query_time_rounded_utc - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S') end_time = (query_time_rounded_utc + timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S') result = supabase.table('tide_predictions')\ .select('*')\ .eq('station_id', station_id)\ .gte('predicted_at', start_time)\ .lte('predicted_at', end_time)\ .order('predicted_at')\ .limit(1)\ .execute() if result.data: # 최종 예측 데이터가 있는 경우 data = result.data[0] # UTC를 KST로 변환 time_utc = datetime.fromisoformat(data['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) return create_api_response( success=True, data={ "record_time": time_kst.isoformat(), # KST로 변환 "record_time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'), "tide_level": round(data.get('final_tide_level', 0), 1), "residual_value": round(data.get('predicted_residual', 0), 1), "harmonic_value": round(data.get('harmonic_level', 0), 1), "data_source": "final_prediction", "confidence": "high" }, meta=get_station_meta(station_id) ) # 2차: 조화 예측 (harmonic_predictions) 폴백 if use_harmonic_fallback and not result.data: # 1차: 정확한 시간 매칭 시도 result = supabase.table('harmonic_predictions')\ .select('*')\ .eq('station_id', station_id)\ .eq('predicted_at', query_time_rounded_utc.strftime('%Y-%m-%dT%H:%M:%S'))\ .execute() # 2차: 정확한 매칭이 없으면 전후 5분 범위 if not result.data: start_time = (query_time_rounded_utc - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S') end_time = (query_time_rounded_utc + timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%S') result = supabase.table('harmonic_predictions')\ .select('*')\ .eq('station_id', station_id)\ .gte('predicted_at', start_time)\ .lte('predicted_at', end_time)\ .order('predicted_at')\ .limit(1)\ .execute() if result.data: data = result.data[0] # UTC를 KST로 변환 time_utc = datetime.fromisoformat(data['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) return create_api_response( success=True, data={ "record_time": time_kst.isoformat(), # KST로 변환 "record_time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'), "tide_level": round(data.get('harmonic_level', 0), 1), "residual_value": None, # 잔차 예측 없음 "harmonic_value": round(data.get('harmonic_level', 0), 1), "data_source": "harmonic_only", "confidence": "medium", "note": "잔차 예측이 없어 조화 예측만 제공됩니다", "query_time": query_time.strftime('%Y-%m-%d %H:%M:%S KST'), "matched_time_diff_seconds": abs((time_utc - query_time_utc).total_seconds()) }, meta=get_station_meta(station_id) ) return create_api_response( success=False, error=f"No data available for {query_str}", meta=get_station_meta(station_id) ) except Exception as e: return create_api_response(False, error=str(e)) # 2. 시간대별 조위 조회 (공공 API 형식) def api_get_tide_series( station_id: str, start_time: Optional[str] = None, end_time: Optional[str] = None, interval_minutes: int = 60 ) -> Dict: """ 시간대별 조위 정보 조회 (공공 API 형식과 유사) Args: station_id: 관측소 ID start_time: 시작 시간 (None이면 현재) end_time: 종료 시간 (None이면 24시간 후) interval_minutes: 데이터 간격 (기본 60분) Returns: 시계열 데이터 """ supabase = get_supabase_client() if not supabase: return create_api_response(False, error="Database connection failed") try: # 시간 범위 설정 kst = pytz.timezone('Asia/Seoul') if start_time: start_dt = datetime.fromisoformat(start_time.replace('Z', '+00:00')) else: start_dt = datetime.now(kst) if end_time: end_dt = datetime.fromisoformat(end_time.replace('Z', '+00:00')) else: end_dt = start_dt + timedelta(hours=24) # 최종 예측 조회 result = supabase.table('tide_predictions')\ .select('predicted_at, final_tide_level, predicted_residual, harmonic_level')\ .eq('station_id', station_id)\ .gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .order('predicted_at')\ .execute() data_points = [] data_source = "final_prediction" if result.data: # 간격에 맞춰 데이터 필터링 for i, item in enumerate(result.data): if i % (interval_minutes // 5) == 0: # 5분 간격 데이터 기준 # UTC를 KST로 변환 time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) data_points.append({ "record_time": time_kst.strftime('%Y-%m-%d %H:%M:%S'), # KST "real_value": str(round(item['final_tide_level'], 0)), # 정수로 표시 "pre_value": str(round(item['harmonic_level'], 0)), "residual": str(round(item['predicted_residual'], 0)) }) else: # 조화 예측 폴백 result = supabase.table('harmonic_predictions')\ .select('predicted_at, harmonic_level')\ .eq('station_id', station_id)\ .gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .order('predicted_at')\ .execute() if result.data: data_source = "harmonic_only" for i, item in enumerate(result.data): if i % (interval_minutes // 5) == 0: # UTC를 KST로 변환 time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) data_points.append({ "record_time": time_kst.strftime('%Y-%m-%d %H:%M:%S'), # KST "real_value": str(round(item['harmonic_level'], 0)), "pre_value": str(round(item['harmonic_level'], 0)), "residual": "0" }) meta = get_station_meta(station_id) meta["data_source"] = data_source meta["data_count"] = len(data_points) meta["interval_minutes"] = interval_minutes return { "result": { "meta": meta, "data": data_points } } except Exception as e: return create_api_response(False, error=str(e)) # 3. 만조/간조 정보 def api_get_extremes_info( station_id: str, date: Optional[str] = None, include_secondary: bool = False ) -> Dict: """ 특정 날짜의 만조/간조 정보 Args: station_id: 관측소 ID date: 날짜 (YYYY-MM-DD, None이면 오늘) include_secondary: 부차 만조/간조 포함 여부 Returns: 만조/간조 시간과 수위 """ supabase = get_supabase_client() if not supabase: return create_api_response(False, error="Database connection failed") try: # 날짜 범위 설정 if date: target_date = datetime.strptime(date, '%Y-%m-%d') else: target_date = datetime.now(pytz.timezone('Asia/Seoul')) start_dt = target_date.replace(hour=0, minute=0, second=0) end_dt = target_date.replace(hour=23, minute=59, second=59) # 데이터 조회 (최종 예측 우선) result = supabase.table('tide_predictions')\ .select('predicted_at, final_tide_level')\ .eq('station_id', station_id)\ .gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .order('predicted_at')\ .execute() data_source = "final_prediction" # 데이터가 없으면 조화 예측 사용 if not result.data: result = supabase.table('harmonic_predictions')\ .select('predicted_at, harmonic_level')\ .eq('station_id', station_id)\ .gte('predicted_at', start_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .lte('predicted_at', end_dt.strftime('%Y-%m-%dT%H:%M:%S'))\ .order('predicted_at')\ .execute() if result.data: # 컬럼명 통일 for item in result.data: item['final_tide_level'] = item.pop('harmonic_level') data_source = "harmonic_only" if not result.data or len(result.data) < 3: return create_api_response(False, error="Insufficient data for extremes") # 극값 찾기 extremes = [] data = result.data for i in range(1, len(data) - 1): prev_level = data[i-1]['final_tide_level'] curr_level = data[i]['final_tide_level'] next_level = data[i+1]['final_tide_level'] # 만조 (극대값) if curr_level > prev_level and curr_level > next_level: # UTC를 KST로 변환 time_utc = datetime.fromisoformat(data[i]['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) extremes.append({ 'type': 'high_tide', 'time': time_kst.isoformat(), # KST ISO format 'time_kst': time_kst.strftime('%Y-%m-%d %H:%M:%S KST'), 'level': round(curr_level, 2) }) # 간조 (극소값) elif curr_level < prev_level and curr_level < next_level: # UTC를 KST로 변환 time_utc = datetime.fromisoformat(data[i]['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) extremes.append({ 'type': 'low_tide', 'time': time_kst.isoformat(), # KST ISO format 'time_kst': time_kst.strftime('%Y-%m-%d %H:%M:%S KST'), 'level': round(curr_level, 2) }) # 주요 만조/간조만 필터링 (부차 제외) if not include_secondary and len(extremes) > 4: # 수위 차이가 큰 것들만 선택 high_tides = sorted([e for e in extremes if e['type'] == 'high_tide'], key=lambda x: x['level'], reverse=True)[:2] low_tides = sorted([e for e in extremes if e['type'] == 'low_tide'], key=lambda x: x['level'])[:2] extremes = sorted(high_tides + low_tides, key=lambda x: x['time']) meta = get_station_meta(station_id) meta["date"] = target_date.strftime('%Y-%m-%d') meta["data_source"] = data_source return create_api_response( success=True, data={ "extremes": extremes, "summary": { "high_tide_count": len([e for e in extremes if e['type'] == 'high_tide']), "low_tide_count": len([e for e in extremes if e['type'] == 'low_tide']), "max_level": max([e['level'] for e in extremes]) if extremes else None, "min_level": min([e['level'] for e in extremes]) if extremes else None } }, meta=meta ) except Exception as e: return create_api_response(False, error=str(e)) # 4. 위험 수위 알림 def api_check_tide_alert( station_id: str, hours_ahead: int = 24, warning_level: float = 700.0, danger_level: float = 750.0 ) -> Dict: """ 위험 수위 체크 및 알림 Args: station_id: 관측소 ID hours_ahead: 확인할 시간 (기본 24시간) warning_level: 주의 수위 (cm) danger_level: 경고 수위 (cm) Returns: 위험 수위 정보 """ supabase = get_supabase_client() if not supabase: return create_api_response(False, error="Database connection failed") try: now = datetime.now(pytz.timezone('Asia/Seoul')) end_time = now + timedelta(hours=hours_ahead) # 위험 수위 데이터 조회 result = supabase.table('tide_predictions')\ .select('predicted_at, final_tide_level')\ .eq('station_id', station_id)\ .gte('predicted_at', now.strftime('%Y-%m-%dT%H:%M:%S'))\ .lte('predicted_at', end_time.strftime('%Y-%m-%dT%H:%M:%S'))\ .gte('final_tide_level', warning_level)\ .order('predicted_at')\ .execute() alerts = [] alert_level = "safe" if result.data: for item in result.data: level = item['final_tide_level'] if level >= danger_level: severity = "danger" alert_level = "danger" elif level >= warning_level: severity = "warning" if alert_level != "danger": alert_level = "warning" else: continue # UTC를 KST로 변환 time_utc = datetime.fromisoformat(item['predicted_at'].replace('Z', '+00:00')) time_kst = time_utc.astimezone(pytz.timezone('Asia/Seoul')) alerts.append({ "time": time_kst.isoformat(), # KST ISO format "time_kst": time_kst.strftime('%Y-%m-%d %H:%M:%S KST'), "level": round(level, 2), "severity": severity }) # 첫 위험 시간 계산 first_alert_time = None first_alert_time_kst = None if alerts: first_alert_time = alerts[0]['time'] # 이미 KST first_alert_time_kst = alerts[0]['time_kst'] time_until = (datetime.fromisoformat(first_alert_time) - now).total_seconds() / 3600 else: time_until = None meta = get_station_meta(station_id) meta["check_time"] = now.isoformat() meta["hours_ahead"] = hours_ahead return create_api_response( success=True, data={ "alert_level": alert_level, "alert_count": len(alerts), "first_alert_time": first_alert_time, "hours_until_first": round(time_until, 1) if time_until else None, "alerts": alerts[:10], # 최대 10개만 "thresholds": { "warning": warning_level, "danger": danger_level } }, meta=meta ) except Exception as e: return create_api_response(False, error=str(e)) # 5. 다중 관측소 비교 def api_compare_stations( station_ids: List[str], target_time: Optional[str] = None ) -> Dict: """ 여러 관측소 동시 비교 Args: station_ids: 관측소 ID 리스트 target_time: 비교 시간 (None이면 현재) Returns: 관측소별 조위 비교 정보 """ if not station_ids: return create_api_response(False, error="No station IDs provided") try: comparison_data = [] for station_id in station_ids[:10]: # 최대 10개 관측소 result = api_get_tide_level(station_id, target_time) if result.get("success") and result.get("data"): data = result["data"] comparison_data.append({ "station_id": station_id, "station_name": STATION_NAMES.get(station_id, "Unknown"), "tide_level": data.get("tide_level"), "data_source": data.get("data_source"), "time": data.get("record_time") }) else: comparison_data.append({ "station_id": station_id, "station_name": STATION_NAMES.get(station_id, "Unknown"), "tide_level": None, "data_source": "no_data", "time": None }) # 수위 기준 정렬 comparison_data.sort(key=lambda x: x['tide_level'] if x['tide_level'] else 0, reverse=True) # 통계 계산 valid_levels = [d['tide_level'] for d in comparison_data if d['tide_level']] stats = { "max_level": max(valid_levels) if valid_levels else None, "min_level": min(valid_levels) if valid_levels else None, "avg_level": round(sum(valid_levels) / len(valid_levels), 1) if valid_levels else None, "station_count": len(comparison_data), "valid_count": len(valid_levels) } return create_api_response( success=True, data={ "comparison": comparison_data, "statistics": stats }, meta={ "query_time": target_time or datetime.now(pytz.timezone('Asia/Seoul')).isoformat(), "station_count": len(station_ids) } ) except Exception as e: return create_api_response(False, error=str(e)) # 6. 건강 체크 / 상태 확인 def api_health_check() -> Dict: """ API 및 데이터베이스 상태 확인 Returns: 시스템 상태 정보 """ try: supabase = get_supabase_client() db_status = "connected" if supabase else "disconnected" # 데이터 가용성 체크 data_availability = {} if supabase: # 최종 예측 데이터 확인 result = supabase.table('tide_predictions')\ .select('station_id', count='exact')\ .limit(1)\ .execute() tide_count = result.count if hasattr(result, 'count') else 0 # 조화 예측 데이터 확인 result = supabase.table('harmonic_predictions')\ .select('station_id', count='exact')\ .limit(1)\ .execute() harmonic_count = result.count if hasattr(result, 'count') else 0 data_availability = { "tide_predictions": tide_count, "harmonic_predictions": harmonic_count } return create_api_response( success=True, data={ "status": "healthy" if db_status == "connected" else "degraded", "database": db_status, "data_availability": data_availability, "api_version": "1.0.0", "endpoints": [ "/api/tide_level", "/api/tide_series", "/api/extremes", "/api/alert", "/api/compare", "/api/health" ] } ) except Exception as e: return create_api_response( success=False, error=str(e), data={"status": "error"} )