Spaces:
Sleeping
Sleeping
# noise_scenarios.py - 노이즈 시나리오 생성 엔진 | |
import numpy as np | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
import random | |
def generate_typhoon_scenario(df, intensity=1.0): | |
""" | |
태풍 시나리오: 급격한 기압 하강 + 강풍 + 조위 상승 | |
intensity: 0.5(약함) ~ 2.0(매우 강함) | |
df는 이미 144개로 슬라이싱된 상태 | |
""" | |
print(f"🌀 태풍 시나리오 생성 (강도: {intensity}, 데이터: {len(df)}개)") | |
df_noisy = df.copy() | |
n_points = len(df) # 144개 | |
# 태풍 시나리오 설정 (144개 기준) | |
if n_points >= 72: | |
# 충분한 데이터가 있으면 중간-후반에 태풍 배치 | |
typhoon_center = int(n_points * 0.7) # 70% 지점 (약 100번째) | |
typhoon_duration = min(int(24 * intensity), n_points // 3) # 최대 2시간 | |
else: | |
# 적은 데이터면 중간에 배치 | |
typhoon_center = n_points // 2 | |
typhoon_duration = min(int(12 * intensity), n_points // 2) # 최대 1시간 | |
start_idx = max(0, typhoon_center - typhoon_duration // 2) | |
end_idx = min(n_points, typhoon_center + typhoon_duration // 2) | |
print(f" 🌪️ 태풍 구간: {start_idx}-{end_idx} ({end_idx-start_idx}개 포인트)") | |
for i in range(start_idx, end_idx): | |
# 안전한 인덱스 체크 | |
if i >= len(df_noisy): | |
break | |
# 태풍 중심으로부터의 거리 (0~1) | |
if typhoon_duration <= 2: | |
distance_from_center = 0 # 매우 짧은 경우 균등 적용 | |
else: | |
distance_from_center = abs(i - typhoon_center) / max(1, typhoon_duration // 2) | |
typhoon_strength = (1 - distance_from_center) * intensity | |
# DataFrame 인덱스 리셋 (안전한 접근을 위해) | |
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1] | |
# 1. 기압 급강하 (중심에서 최대 -80hPa) - 더 강하게 | |
if 'air_pres' in df_noisy.columns: | |
pressure_drop = -80 * typhoon_strength * (1 + np.random.normal(0, 0.3)) | |
df_noisy.at[actual_idx, 'air_pres'] += pressure_drop | |
# 2. 강풍 (최대 40m/s) - 더 강하게 | |
if 'wind_speed' in df_noisy.columns: | |
wind_boost = 35 * typhoon_strength * (1 + np.random.normal(0, 0.4)) | |
df_noisy.at[actual_idx, 'wind_speed'] += wind_boost | |
# 3. 풍향 변화 (태풍 회전) - 더 극적으로 | |
if 'wind_dir' in df_noisy.columns: | |
wind_dir_change = 270 * typhoon_strength * np.sin(distance_from_center * np.pi * 2) | |
current_dir = df_noisy.at[actual_idx, 'wind_dir'] | |
df_noisy.at[actual_idx, 'wind_dir'] = (current_dir + wind_dir_change) % 360 | |
# 4. 폭풍 해일로 인한 조위 상승 (최대 +150cm) - 더 강하게 | |
if 'tide_level' in df_noisy.columns: | |
storm_surge = 120 * typhoon_strength * (1 + np.random.normal(0, 0.5)) | |
df_noisy.at[actual_idx, 'tide_level'] += storm_surge | |
# 5. 기온 변화 (구름으로 인한 온도 하강) - 더 강하게 | |
if 'air_temp' in df_noisy.columns: | |
temp_drop = -12 * typhoon_strength * (1 + np.random.normal(0, 0.4)) | |
df_noisy.at[actual_idx, 'air_temp'] += temp_drop | |
return df_noisy | |
def generate_sensor_malfunction_scenario(df, intensity=1.0): | |
""" | |
센서 오작동 시나리오: 랜덤한 극값 + 스파이크 노이즈 | |
intensity: 0.5(약함) ~ 2.0(심각함) | |
""" | |
print(f"📡 센서 오작동 시나리오 생성 (강도: {intensity})") | |
df_noisy = df.copy() | |
n_points = len(df) | |
# 오작동 포인트 수 (전체의 5-20%) | |
malfunction_count = int(n_points * 0.05 * intensity) | |
malfunction_indices = random.sample(range(n_points), malfunction_count) | |
for idx in malfunction_indices: | |
# 안전한 인덱스 체크 | |
if idx >= len(df_noisy): | |
continue | |
# 랜덤하게 한 개 컬럼 선택하여 오작동 | |
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist() | |
if 'date' in numeric_cols: | |
numeric_cols.remove('date') | |
if not numeric_cols: | |
continue | |
malfunction_col = random.choice(numeric_cols) | |
# 오작동 유형 랜덤 선택 | |
malfunction_type = random.choice(['spike', 'stuck', 'drift', 'outlier']) | |
# 안전한 인덱스 접근 | |
actual_idx = df_noisy.index[idx] if idx < len(df_noisy) else df_noisy.index[-1] | |
if malfunction_type == 'spike': | |
# 급격한 스파이크 | |
spike_magnitude = intensity * 5 * random.choice([-1, 1]) | |
original_val = df_noisy.at[actual_idx, malfunction_col] | |
df_noisy.at[actual_idx, malfunction_col] = original_val * (1 + spike_magnitude) | |
elif malfunction_type == 'stuck': | |
# 값이 고정됨 (5-10개 포인트) | |
stuck_duration = random.randint(3, int(8 * intensity)) | |
stuck_value = df_noisy.at[actual_idx, malfunction_col] | |
end_idx = min(n_points - 1, idx + stuck_duration) | |
for i in range(idx, end_idx + 1): | |
if i < len(df_noisy): | |
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = stuck_value | |
elif malfunction_type == 'drift': | |
# 점진적 드리프트 | |
drift_duration = random.randint(10, int(30 * intensity)) | |
drift_magnitude = intensity * 2 * random.choice([-1, 1]) | |
end_idx = min(n_points - 1, idx + drift_duration) | |
for i in range(idx, end_idx + 1): | |
if i < len(df_noisy): | |
drift_factor = (i - idx) / max(1, drift_duration) * drift_magnitude | |
original_val = df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] | |
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = original_val * (1 + drift_factor) | |
elif malfunction_type == 'outlier': | |
# 극값 아웃라이어 | |
col_std = df_noisy[malfunction_col].std() | |
col_mean = df_noisy[malfunction_col].mean() | |
outlier_val = col_mean + random.choice([-1, 1]) * col_std * 5 * intensity | |
df_noisy.at[actual_idx, malfunction_col] = outlier_val | |
return df_noisy | |
def generate_burst_missing_scenario(df, intensity=1.0): | |
""" | |
연속 결측치 시나리오: 센서 완전 실패 | |
intensity: 0.5(짧은 결측) ~ 2.0(긴 결측) | |
""" | |
print(f"❌ 연속 결측치 시나리오 생성 (강도: {intensity})") | |
df_noisy = df.copy() | |
n_points = len(df) | |
# 결측 구간 수 (1-3개) | |
num_missing_blocks = random.randint(1, 3) | |
for _ in range(num_missing_blocks): | |
# 결측 구간 길이 (30분 ~ 6시간), 하지만 전체 길이를 초과하지 않음 | |
max_duration = min(int(72 * intensity), n_points // 2) | |
missing_duration = random.randint(int(6 * intensity), max(int(6 * intensity) + 1, max_duration)) | |
start_idx = random.randint(0, max(1, n_points - missing_duration)) | |
end_idx = min(n_points, start_idx + missing_duration) | |
# 결측시킬 컬럼들 랜덤 선택 (tide_level 포함) | |
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist() | |
if 'date' in numeric_cols: | |
numeric_cols.remove('date') | |
# 강도에 따라 결측 컬럼 수 결정 | |
missing_cols_count = random.randint(1, min(len(numeric_cols), int(3 * intensity))) | |
missing_cols = random.sample(numeric_cols, missing_cols_count) | |
print(f" 📍 {start_idx}-{end_idx} 구간에서 {missing_cols} 결측 처리") | |
# 해당 구간을 NaN으로 설정 | |
for col in missing_cols: | |
df_noisy.loc[start_idx:end_idx, col] = np.nan | |
return df_noisy | |
def generate_extreme_weather_scenario(df, intensity=1.0): | |
""" | |
극한 기상 시나리오: 폭염, 한파, 폭설 등 | |
intensity: 0.5(보통) ~ 2.0(극한) | |
""" | |
print(f"🌡️ 극한 기상 시나리오 생성 (강도: {intensity})") | |
df_noisy = df.copy() | |
n_points = len(df) | |
# 극한 기상 유형 선택 | |
weather_type = random.choice(['heatwave', 'coldwave', 'highpressure', 'lowpressure']) | |
# 영향 구간 (2-8시간), 하지만 전체 길이를 초과하지 않음 | |
max_duration = min(int(96 * intensity), n_points // 2) | |
duration = random.randint(int(24 * intensity), max(int(24 * intensity) + 1, max_duration)) | |
start_idx = random.randint(0, max(1, n_points - duration)) | |
end_idx = min(n_points, start_idx + duration) | |
for i in range(start_idx, end_idx): | |
# 안전한 인덱스 체크 | |
if i >= len(df_noisy): | |
break | |
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1] | |
progress = (i - start_idx) / max(1, duration) # 0~1 | |
effect_strength = np.sin(progress * np.pi) * intensity # 중간에 최대 | |
if weather_type == 'heatwave': | |
# 폭염: 고온 + 저기압 + 약한 바람 | |
if 'air_temp' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_temp'] += 15 * effect_strength | |
if 'air_pres' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_pres'] -= 10 * effect_strength | |
if 'wind_speed' in df_noisy.columns: | |
current_wind = df_noisy.at[actual_idx, 'wind_speed'] | |
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.5 * effect_strength) | |
if 'tide_level' in df_noisy.columns: | |
# 열팽창으로 미세한 해수면 상승 | |
df_noisy.at[actual_idx, 'tide_level'] += 5 * effect_strength | |
elif weather_type == 'coldwave': | |
# 한파: 저온 + 고기압 + 강한 바람 | |
if 'air_temp' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_temp'] -= 20 * effect_strength | |
if 'air_pres' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_pres'] += 20 * effect_strength | |
if 'wind_speed' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'wind_speed'] += 10 * effect_strength | |
if 'tide_level' in df_noisy.columns: | |
# 해수 수축으로 미세한 해수면 하강 | |
df_noisy.at[actual_idx, 'tide_level'] -= 3 * effect_strength | |
elif weather_type == 'highpressure': | |
# 고기압: 맑은 날씨, 약한 바람 | |
if 'air_pres' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_pres'] += 25 * effect_strength | |
if 'wind_speed' in df_noisy.columns: | |
current_wind = df_noisy.at[actual_idx, 'wind_speed'] | |
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.7 * effect_strength) | |
elif weather_type == 'lowpressure': | |
# 저기압: 흐린 날씨, 강한 바람 | |
if 'air_pres' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'air_pres'] -= 20 * effect_strength | |
if 'wind_speed' in df_noisy.columns: | |
df_noisy.at[actual_idx, 'wind_speed'] += 8 * effect_strength | |
if 'tide_level' in df_noisy.columns: | |
# 저기압으로 인한 조위 상승 | |
df_noisy.at[actual_idx, 'tide_level'] += 15 * effect_strength | |
return df_noisy | |
def create_noise_comparison_plot(df_original, df_noisy, scenario_name): | |
""" | |
원본 vs 노이즈 데이터 비교 시각화 (개선된 버전) | |
""" | |
fig = make_subplots( | |
rows=3, cols=2, | |
subplot_titles=['🌊 조위 (tide_level)', '🌬️ 기압 (air_pres)', | |
'💨 풍속 (wind_speed)', '🌡️ 기온 (air_temp)', | |
'🧭 풍향 (wind_dir)', '📊 조위 전체 비교'], | |
vertical_spacing=0.10, | |
horizontal_spacing=0.08 | |
) | |
# 안전한 DataFrame 정렬 - 같은 길이와 인덱스로 맞추기 | |
df_orig_slice = df_original.tail(len(df_noisy)).reset_index(drop=True) | |
df_noisy_reset = df_noisy.reset_index(drop=True) | |
# 최소 길이로 맞추기 | |
min_len = min(len(df_orig_slice), len(df_noisy_reset)) | |
df_orig_slice = df_orig_slice[:min_len] | |
df_noisy_reset = df_noisy_reset[:min_len] | |
# 시간축 (공통 길이 사용) | |
time_axis = list(range(len(df_orig_slice))) | |
# 색상 설정 (더 선명하게) | |
original_color = '#2E86AB' # 진한 파랑 | |
noise_color = '#F24236' # 진한 빨강 | |
# 각 변수별 비교 플롯 | |
variables = ['tide_level', 'air_pres', 'wind_speed', 'air_temp', 'wind_dir'] | |
positions = [(1,1), (1,2), (2,1), (2,2), (3,1)] | |
var_units = ['cm', 'hPa', 'm/s', '°C', '°'] | |
for var, (row, col), unit in zip(variables, positions, var_units): | |
if var in df_orig_slice.columns and var in df_noisy_reset.columns: | |
try: | |
# 원본 데이터 (실선, 두껍게) | |
fig.add_trace( | |
go.Scatter( | |
x=time_axis, y=df_orig_slice[var], | |
name=f'🔵 원본', | |
line=dict(color=original_color, width=3), | |
showlegend=(row==1 and col==1), | |
hovertemplate=f'원본 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>' | |
), | |
row=row, col=col | |
) | |
# 노이즈 데이터 (점선, 두껍게) | |
fig.add_trace( | |
go.Scatter( | |
x=time_axis, y=df_noisy_reset[var], | |
name=f'🔴 노이즈', | |
line=dict(color=noise_color, width=2.5, dash='dash'), | |
showlegend=(row==1 and col==1), | |
hovertemplate=f'노이즈 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>' | |
), | |
row=row, col=col | |
) | |
# Y축 범위 자동 조정 (변화를 더 명확히 보기 위해) - 안전하게 | |
orig_vals = df_orig_slice[var].dropna() | |
noisy_vals = df_noisy_reset[var].dropna() | |
if len(orig_vals) > 0 and len(noisy_vals) > 0: | |
original_range = orig_vals.max() - orig_vals.min() | |
noisy_range = noisy_vals.max() - noisy_vals.min() | |
# 더 큰 범위를 기준으로 패딩 적용 | |
total_range = max(original_range, noisy_range) | |
y_center = (orig_vals.mean() + noisy_vals.mean()) / 2 | |
if total_range > 0: | |
y_min = y_center - total_range * 0.6 | |
y_max = y_center + total_range * 0.6 | |
fig.update_yaxes(range=[y_min, y_max], row=row, col=col) | |
except Exception as e: | |
print(f"변수 {var} 플롯 생성 오류: {e}") | |
continue | |
# 전체 비교 (tide_level 중심) - 더 크고 선명하게 | |
if 'tide_level' in df_orig_slice.columns and 'tide_level' in df_noisy_reset.columns: | |
try: | |
fig.add_trace( | |
go.Scatter( | |
x=time_axis, y=df_orig_slice['tide_level'], | |
name='🔵 원본 조위', | |
line=dict(color=original_color, width=4), | |
showlegend=True, | |
hovertemplate='원본 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>' | |
), | |
row=3, col=2 | |
) | |
fig.add_trace( | |
go.Scatter( | |
x=time_axis, y=df_noisy_reset['tide_level'], | |
name='🔴 노이즈 조위', | |
line=dict(color=noise_color, width=3, dash='dash'), | |
showlegend=True, | |
hovertemplate='노이즈 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>' | |
), | |
row=3, col=2 | |
) | |
except Exception as e: | |
print(f"전체 조위 비교 플롯 생성 오류: {e}") | |
# 노이즈 시나리오 구간 하이라이트 추가 | |
def add_scenario_highlight(scenario_name): | |
if scenario_name == 'typhoon': | |
# 태풍 구간 계산 (정렬된 DataFrame 기준) | |
n_points = len(df_orig_slice) | |
if n_points >= 72: | |
typhoon_center = int(n_points * 0.7) # 70% 지점 | |
typhoon_duration = min(24, n_points // 3) # 최대 2시간 | |
else: | |
typhoon_center = n_points // 2 | |
typhoon_duration = min(12, n_points // 2) # 최대 1시간 | |
start_idx = max(0, typhoon_center - typhoon_duration // 2) | |
end_idx = min(n_points, typhoon_center + typhoon_duration // 2) | |
print(f" 📍 시각화 태풍 구간: {start_idx}-{end_idx}") | |
# 모든 서브플롯에 배경 영역 추가 | |
try: | |
for row in range(1, 4): | |
for col in range(1, 3): | |
fig.add_vrect( | |
x0=start_idx, x1=end_idx, | |
fillcolor="rgba(255,0,0,0.15)", | |
layer="below", line_width=0, | |
annotation_text="🌀 태풍 구간", | |
annotation_position="top left", | |
row=row, col=col | |
) | |
except Exception as e: | |
print(f"태풍 구간 하이라이트 오류: {e}") | |
add_scenario_highlight(scenario_name) | |
# 레이아웃 개선 | |
fig.update_layout( | |
title={ | |
'text': f"🌪️ 노이즈 시나리오: {scenario_name}", | |
'x': 0.5, | |
'font': {'size': 20, 'color': '#2E86AB'} | |
}, | |
height=900, | |
showlegend=True, | |
legend=dict( | |
x=0.02, # 왼쪽으로 이동 | |
y=0.98, # 위쪽으로 이동 | |
bgcolor='rgba(255,255,255,0.8)', | |
bordercolor='gray', | |
borderwidth=1, | |
font=dict(size=12) | |
), | |
plot_bgcolor='rgba(248,249,250,0.8)', | |
paper_bgcolor='white' | |
) | |
# X축 레이블 개선 | |
fig.update_xaxes(title_text="시간 순서", showgrid=True, gridcolor='lightgray') | |
fig.update_yaxes(showgrid=True, gridcolor='lightgray') | |
return fig | |
def apply_noise_scenario(df, scenario_type, intensity=1.0): | |
""" | |
선택된 노이즈 시나리오 적용 (144개 슬라이싱 후) | |
""" | |
scenario_functions = { | |
'typhoon': generate_typhoon_scenario, | |
'sensor_malfunction': generate_sensor_malfunction_scenario, | |
'burst_missing': generate_burst_missing_scenario, | |
'extreme_weather': generate_extreme_weather_scenario | |
} | |
if scenario_type not in scenario_functions: | |
raise ValueError(f"Unknown scenario type: {scenario_type}") | |
print(f"\n🌪️ {scenario_type} 시나리오 적용 중...") | |
# 1. 먼저 마지막 144개로 슬라이싱 (실제 모델 입력과 동일) | |
print(f"📊 원본 데이터: {len(df)}행") | |
df_sliced = df.tail(144).copy() | |
print(f"✂️ 슬라이싱 후: {len(df_sliced)}행 (마지막 144개)") | |
# 2. 슬라이싱된 데이터에 노이즈 적용 | |
df_noisy = scenario_functions[scenario_type](df_sliced, intensity) | |
# 3. 비교 시각화 생성 (슬라이싱된 원본 vs 노이즈) | |
plot = create_noise_comparison_plot(df_sliced, df_noisy, scenario_type) | |
return df_noisy, plot |