my-tide-env / noise_scenarios.py
alwaysgood's picture
Update noise_scenarios.py
ea729fc verified
# noise_scenarios.py - 노이즈 시나리오 생성 엔진
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import random
def generate_typhoon_scenario(df, intensity=1.0):
"""
태풍 시나리오: 급격한 기압 하강 + 강풍 + 조위 상승
intensity: 0.5(약함) ~ 2.0(매우 강함)
df는 이미 144개로 슬라이싱된 상태
"""
print(f"🌀 태풍 시나리오 생성 (강도: {intensity}, 데이터: {len(df)}개)")
df_noisy = df.copy()
n_points = len(df) # 144개
# 태풍 시나리오 설정 (144개 기준)
if n_points >= 72:
# 충분한 데이터가 있으면 중간-후반에 태풍 배치
typhoon_center = int(n_points * 0.7) # 70% 지점 (약 100번째)
typhoon_duration = min(int(24 * intensity), n_points // 3) # 최대 2시간
else:
# 적은 데이터면 중간에 배치
typhoon_center = n_points // 2
typhoon_duration = min(int(12 * intensity), n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 🌪️ 태풍 구간: {start_idx}-{end_idx} ({end_idx-start_idx}개 포인트)")
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
# 태풍 중심으로부터의 거리 (0~1)
if typhoon_duration <= 2:
distance_from_center = 0 # 매우 짧은 경우 균등 적용
else:
distance_from_center = abs(i - typhoon_center) / max(1, typhoon_duration // 2)
typhoon_strength = (1 - distance_from_center) * intensity
# DataFrame 인덱스 리셋 (안전한 접근을 위해)
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
# 1. 기압 급강하 (중심에서 최대 -80hPa) - 더 강하게
if 'air_pres' in df_noisy.columns:
pressure_drop = -80 * typhoon_strength * (1 + np.random.normal(0, 0.3))
df_noisy.at[actual_idx, 'air_pres'] += pressure_drop
# 2. 강풍 (최대 40m/s) - 더 강하게
if 'wind_speed' in df_noisy.columns:
wind_boost = 35 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'wind_speed'] += wind_boost
# 3. 풍향 변화 (태풍 회전) - 더 극적으로
if 'wind_dir' in df_noisy.columns:
wind_dir_change = 270 * typhoon_strength * np.sin(distance_from_center * np.pi * 2)
current_dir = df_noisy.at[actual_idx, 'wind_dir']
df_noisy.at[actual_idx, 'wind_dir'] = (current_dir + wind_dir_change) % 360
# 4. 폭풍 해일로 인한 조위 상승 (최대 +150cm) - 더 강하게
if 'tide_level' in df_noisy.columns:
storm_surge = 120 * typhoon_strength * (1 + np.random.normal(0, 0.5))
df_noisy.at[actual_idx, 'tide_level'] += storm_surge
# 5. 기온 변화 (구름으로 인한 온도 하강) - 더 강하게
if 'air_temp' in df_noisy.columns:
temp_drop = -12 * typhoon_strength * (1 + np.random.normal(0, 0.4))
df_noisy.at[actual_idx, 'air_temp'] += temp_drop
return df_noisy
def generate_sensor_malfunction_scenario(df, intensity=1.0):
"""
센서 오작동 시나리오: 랜덤한 극값 + 스파이크 노이즈
intensity: 0.5(약함) ~ 2.0(심각함)
"""
print(f"📡 센서 오작동 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 오작동 포인트 수 (전체의 5-20%)
malfunction_count = int(n_points * 0.05 * intensity)
malfunction_indices = random.sample(range(n_points), malfunction_count)
for idx in malfunction_indices:
# 안전한 인덱스 체크
if idx >= len(df_noisy):
continue
# 랜덤하게 한 개 컬럼 선택하여 오작동
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
if not numeric_cols:
continue
malfunction_col = random.choice(numeric_cols)
# 오작동 유형 랜덤 선택
malfunction_type = random.choice(['spike', 'stuck', 'drift', 'outlier'])
# 안전한 인덱스 접근
actual_idx = df_noisy.index[idx] if idx < len(df_noisy) else df_noisy.index[-1]
if malfunction_type == 'spike':
# 급격한 스파이크
spike_magnitude = intensity * 5 * random.choice([-1, 1])
original_val = df_noisy.at[actual_idx, malfunction_col]
df_noisy.at[actual_idx, malfunction_col] = original_val * (1 + spike_magnitude)
elif malfunction_type == 'stuck':
# 값이 고정됨 (5-10개 포인트)
stuck_duration = random.randint(3, int(8 * intensity))
stuck_value = df_noisy.at[actual_idx, malfunction_col]
end_idx = min(n_points - 1, idx + stuck_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = stuck_value
elif malfunction_type == 'drift':
# 점진적 드리프트
drift_duration = random.randint(10, int(30 * intensity))
drift_magnitude = intensity * 2 * random.choice([-1, 1])
end_idx = min(n_points - 1, idx + drift_duration)
for i in range(idx, end_idx + 1):
if i < len(df_noisy):
drift_factor = (i - idx) / max(1, drift_duration) * drift_magnitude
original_val = df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)]
df_noisy.iloc[i, df_noisy.columns.get_loc(malfunction_col)] = original_val * (1 + drift_factor)
elif malfunction_type == 'outlier':
# 극값 아웃라이어
col_std = df_noisy[malfunction_col].std()
col_mean = df_noisy[malfunction_col].mean()
outlier_val = col_mean + random.choice([-1, 1]) * col_std * 5 * intensity
df_noisy.at[actual_idx, malfunction_col] = outlier_val
return df_noisy
def generate_burst_missing_scenario(df, intensity=1.0):
"""
연속 결측치 시나리오: 센서 완전 실패
intensity: 0.5(짧은 결측) ~ 2.0(긴 결측)
"""
print(f"❌ 연속 결측치 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 결측 구간 수 (1-3개)
num_missing_blocks = random.randint(1, 3)
for _ in range(num_missing_blocks):
# 결측 구간 길이 (30분 ~ 6시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(72 * intensity), n_points // 2)
missing_duration = random.randint(int(6 * intensity), max(int(6 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - missing_duration))
end_idx = min(n_points, start_idx + missing_duration)
# 결측시킬 컬럼들 랜덤 선택 (tide_level 포함)
numeric_cols = df_noisy.select_dtypes(include=[np.number]).columns.tolist()
if 'date' in numeric_cols:
numeric_cols.remove('date')
# 강도에 따라 결측 컬럼 수 결정
missing_cols_count = random.randint(1, min(len(numeric_cols), int(3 * intensity)))
missing_cols = random.sample(numeric_cols, missing_cols_count)
print(f" 📍 {start_idx}-{end_idx} 구간에서 {missing_cols} 결측 처리")
# 해당 구간을 NaN으로 설정
for col in missing_cols:
df_noisy.loc[start_idx:end_idx, col] = np.nan
return df_noisy
def generate_extreme_weather_scenario(df, intensity=1.0):
"""
극한 기상 시나리오: 폭염, 한파, 폭설 등
intensity: 0.5(보통) ~ 2.0(극한)
"""
print(f"🌡️ 극한 기상 시나리오 생성 (강도: {intensity})")
df_noisy = df.copy()
n_points = len(df)
# 극한 기상 유형 선택
weather_type = random.choice(['heatwave', 'coldwave', 'highpressure', 'lowpressure'])
# 영향 구간 (2-8시간), 하지만 전체 길이를 초과하지 않음
max_duration = min(int(96 * intensity), n_points // 2)
duration = random.randint(int(24 * intensity), max(int(24 * intensity) + 1, max_duration))
start_idx = random.randint(0, max(1, n_points - duration))
end_idx = min(n_points, start_idx + duration)
for i in range(start_idx, end_idx):
# 안전한 인덱스 체크
if i >= len(df_noisy):
break
actual_idx = df_noisy.index[i] if i < len(df_noisy) else df_noisy.index[-1]
progress = (i - start_idx) / max(1, duration) # 0~1
effect_strength = np.sin(progress * np.pi) * intensity # 중간에 최대
if weather_type == 'heatwave':
# 폭염: 고온 + 저기압 + 약한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] += 15 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 10 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.5 * effect_strength)
if 'tide_level' in df_noisy.columns:
# 열팽창으로 미세한 해수면 상승
df_noisy.at[actual_idx, 'tide_level'] += 5 * effect_strength
elif weather_type == 'coldwave':
# 한파: 저온 + 고기압 + 강한 바람
if 'air_temp' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_temp'] -= 20 * effect_strength
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 10 * effect_strength
if 'tide_level' in df_noisy.columns:
# 해수 수축으로 미세한 해수면 하강
df_noisy.at[actual_idx, 'tide_level'] -= 3 * effect_strength
elif weather_type == 'highpressure':
# 고기압: 맑은 날씨, 약한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] += 25 * effect_strength
if 'wind_speed' in df_noisy.columns:
current_wind = df_noisy.at[actual_idx, 'wind_speed']
df_noisy.at[actual_idx, 'wind_speed'] = current_wind * (1 - 0.7 * effect_strength)
elif weather_type == 'lowpressure':
# 저기압: 흐린 날씨, 강한 바람
if 'air_pres' in df_noisy.columns:
df_noisy.at[actual_idx, 'air_pres'] -= 20 * effect_strength
if 'wind_speed' in df_noisy.columns:
df_noisy.at[actual_idx, 'wind_speed'] += 8 * effect_strength
if 'tide_level' in df_noisy.columns:
# 저기압으로 인한 조위 상승
df_noisy.at[actual_idx, 'tide_level'] += 15 * effect_strength
return df_noisy
def create_noise_comparison_plot(df_original, df_noisy, scenario_name):
"""
원본 vs 노이즈 데이터 비교 시각화 (개선된 버전)
"""
fig = make_subplots(
rows=3, cols=2,
subplot_titles=['🌊 조위 (tide_level)', '🌬️ 기압 (air_pres)',
'💨 풍속 (wind_speed)', '🌡️ 기온 (air_temp)',
'🧭 풍향 (wind_dir)', '📊 조위 전체 비교'],
vertical_spacing=0.10,
horizontal_spacing=0.08
)
# 안전한 DataFrame 정렬 - 같은 길이와 인덱스로 맞추기
df_orig_slice = df_original.tail(len(df_noisy)).reset_index(drop=True)
df_noisy_reset = df_noisy.reset_index(drop=True)
# 최소 길이로 맞추기
min_len = min(len(df_orig_slice), len(df_noisy_reset))
df_orig_slice = df_orig_slice[:min_len]
df_noisy_reset = df_noisy_reset[:min_len]
# 시간축 (공통 길이 사용)
time_axis = list(range(len(df_orig_slice)))
# 색상 설정 (더 선명하게)
original_color = '#2E86AB' # 진한 파랑
noise_color = '#F24236' # 진한 빨강
# 각 변수별 비교 플롯
variables = ['tide_level', 'air_pres', 'wind_speed', 'air_temp', 'wind_dir']
positions = [(1,1), (1,2), (2,1), (2,2), (3,1)]
var_units = ['cm', 'hPa', 'm/s', '°C', '°']
for var, (row, col), unit in zip(variables, positions, var_units):
if var in df_orig_slice.columns and var in df_noisy_reset.columns:
try:
# 원본 데이터 (실선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice[var],
name=f'🔵 원본',
line=dict(color=original_color, width=3),
showlegend=(row==1 and col==1),
hovertemplate=f'원본 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# 노이즈 데이터 (점선, 두껍게)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset[var],
name=f'🔴 노이즈',
line=dict(color=noise_color, width=2.5, dash='dash'),
showlegend=(row==1 and col==1),
hovertemplate=f'노이즈 {var}: %{{y:.1f}}{unit}<br>시점: %{{x}}<extra></extra>'
),
row=row, col=col
)
# Y축 범위 자동 조정 (변화를 더 명확히 보기 위해) - 안전하게
orig_vals = df_orig_slice[var].dropna()
noisy_vals = df_noisy_reset[var].dropna()
if len(orig_vals) > 0 and len(noisy_vals) > 0:
original_range = orig_vals.max() - orig_vals.min()
noisy_range = noisy_vals.max() - noisy_vals.min()
# 더 큰 범위를 기준으로 패딩 적용
total_range = max(original_range, noisy_range)
y_center = (orig_vals.mean() + noisy_vals.mean()) / 2
if total_range > 0:
y_min = y_center - total_range * 0.6
y_max = y_center + total_range * 0.6
fig.update_yaxes(range=[y_min, y_max], row=row, col=col)
except Exception as e:
print(f"변수 {var} 플롯 생성 오류: {e}")
continue
# 전체 비교 (tide_level 중심) - 더 크고 선명하게
if 'tide_level' in df_orig_slice.columns and 'tide_level' in df_noisy_reset.columns:
try:
fig.add_trace(
go.Scatter(
x=time_axis, y=df_orig_slice['tide_level'],
name='🔵 원본 조위',
line=dict(color=original_color, width=4),
showlegend=True,
hovertemplate='원본 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
fig.add_trace(
go.Scatter(
x=time_axis, y=df_noisy_reset['tide_level'],
name='🔴 노이즈 조위',
line=dict(color=noise_color, width=3, dash='dash'),
showlegend=True,
hovertemplate='노이즈 조위: %{y:.1f}cm<br>시점: %{x}<extra></extra>'
),
row=3, col=2
)
except Exception as e:
print(f"전체 조위 비교 플롯 생성 오류: {e}")
# 노이즈 시나리오 구간 하이라이트 추가
def add_scenario_highlight(scenario_name):
if scenario_name == 'typhoon':
# 태풍 구간 계산 (정렬된 DataFrame 기준)
n_points = len(df_orig_slice)
if n_points >= 72:
typhoon_center = int(n_points * 0.7) # 70% 지점
typhoon_duration = min(24, n_points // 3) # 최대 2시간
else:
typhoon_center = n_points // 2
typhoon_duration = min(12, n_points // 2) # 최대 1시간
start_idx = max(0, typhoon_center - typhoon_duration // 2)
end_idx = min(n_points, typhoon_center + typhoon_duration // 2)
print(f" 📍 시각화 태풍 구간: {start_idx}-{end_idx}")
# 모든 서브플롯에 배경 영역 추가
try:
for row in range(1, 4):
for col in range(1, 3):
fig.add_vrect(
x0=start_idx, x1=end_idx,
fillcolor="rgba(255,0,0,0.15)",
layer="below", line_width=0,
annotation_text="🌀 태풍 구간",
annotation_position="top left",
row=row, col=col
)
except Exception as e:
print(f"태풍 구간 하이라이트 오류: {e}")
add_scenario_highlight(scenario_name)
# 레이아웃 개선
fig.update_layout(
title={
'text': f"🌪️ 노이즈 시나리오: {scenario_name}",
'x': 0.5,
'font': {'size': 20, 'color': '#2E86AB'}
},
height=900,
showlegend=True,
legend=dict(
x=0.02, # 왼쪽으로 이동
y=0.98, # 위쪽으로 이동
bgcolor='rgba(255,255,255,0.8)',
bordercolor='gray',
borderwidth=1,
font=dict(size=12)
),
plot_bgcolor='rgba(248,249,250,0.8)',
paper_bgcolor='white'
)
# X축 레이블 개선
fig.update_xaxes(title_text="시간 순서", showgrid=True, gridcolor='lightgray')
fig.update_yaxes(showgrid=True, gridcolor='lightgray')
return fig
def apply_noise_scenario(df, scenario_type, intensity=1.0):
"""
선택된 노이즈 시나리오 적용 (144개 슬라이싱 후)
"""
scenario_functions = {
'typhoon': generate_typhoon_scenario,
'sensor_malfunction': generate_sensor_malfunction_scenario,
'burst_missing': generate_burst_missing_scenario,
'extreme_weather': generate_extreme_weather_scenario
}
if scenario_type not in scenario_functions:
raise ValueError(f"Unknown scenario type: {scenario_type}")
print(f"\n🌪️ {scenario_type} 시나리오 적용 중...")
# 1. 먼저 마지막 144개로 슬라이싱 (실제 모델 입력과 동일)
print(f"📊 원본 데이터: {len(df)}행")
df_sliced = df.tail(144).copy()
print(f"✂️ 슬라이싱 후: {len(df_sliced)}행 (마지막 144개)")
# 2. 슬라이싱된 데이터에 노이즈 적용
df_noisy = scenario_functions[scenario_type](df_sliced, intensity)
# 3. 비교 시각화 생성 (슬라이싱된 원본 vs 노이즈)
plot = create_noise_comparison_plot(df_sliced, df_noisy, scenario_type)
return df_noisy, plot