PPPDC_example / app.py
JUNGU's picture
Update app.py
8fd3221 verified
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from io import StringIO
import openpyxl
import matplotlib.font_manager as fm
from scipy import stats
import os
import plotly.figure_factory as ff
#μ‚¬μ΄μ¦ˆ 크게
st.set_page_config(layout="wide")
# ν•œκΈ€ 폰트 μ„€μ •
def set_font():
font_path = "Pretendard-Bold.ttf" # μ‹€μ œ 폰트 파일 경둜둜 λ³€κ²½ν•΄μ£Όμ„Έμš”
fm.fontManager.addfont(font_path)
return {'font.family': 'Pretendard-Bold', 'axes.unicode_minus': False}
# 폰트 섀정을 κ°€μ Έμ˜΅λ‹ˆλ‹€
font_settings = set_font()
# μ„Έμ…˜ μƒνƒœ μ΄ˆκΈ°ν™” 및 관리
def manage_session_state():
if 'data' not in st.session_state:
st.session_state.data = None
if 'processed_data' not in st.session_state:
st.session_state.processed_data = None
if 'numeric_columns' not in st.session_state:
st.session_state.numeric_columns = []
if 'categorical_columns' not in st.session_state:
st.session_state.categorical_columns = []
if 'x_var' not in st.session_state:
st.session_state.x_var = None
if 'y_var' not in st.session_state:
st.session_state.y_var = None
if 'slicers' not in st.session_state:
st.session_state.slicers = {}
if 'analysis_performed' not in st.session_state:
st.session_state.analysis_performed = False
if 'filtered_data' not in st.session_state:
st.session_state.filtered_data = None
def reset_session_state():
# μ„Έμ…˜ μƒνƒœ μ΄ˆκΈ°ν™”
st.session_state.data = None
st.session_state.processed_data = None
st.session_state.filtered_data = None
st.session_state.numeric_columns = []
st.session_state.categorical_columns = []
st.session_state.x_var = None
st.session_state.y_var = None
st.session_state.slicers = {}
st.session_state.analysis_performed = False
SAMPLE_DATA_FILES = [
{"name": "κ³Όλͺ©λ³„ λ…Έλ ₯κ³Ό 성취도", "file": "subject.xlsx"},
{"name": "채점", "file": "score.xlsx"},
{"name": "μΆœμ„μΌμˆ˜μ™€ 성적", "file": "attendance.xlsx"}
]
def load_sample_data(file_name):
# μ˜ˆμ‹œ 데이터 파일 경둜
file_path = os.path.join("sample_data", file_name)
if file_name.endswith('.csv'):
return pd.read_csv(file_path)
elif file_name.endswith(('.xls', '.xlsx')):
return pd.read_excel(file_path)
else:
st.error("μ§€μ›λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹μž…λ‹ˆλ‹€.")
return None
# 데이터 λ‘œλ“œ
@st.cache_data
def load_data(file):
file_extension = file.name.split('.')[-1].lower()
if file_extension == 'csv':
data = pd.read_csv(file)
elif file_extension in ['xls', 'xlsx']:
data = pd.read_excel(file)
else:
st.error("μ§€μ›λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹μž…λ‹ˆλ‹€. CSV, XLS, λ˜λŠ” XLSX νŒŒμΌμ„ μ—…λ‘œλ“œν•΄μ£Όμ„Έμš”.")
return None
# 빈 μ—΄ 이름에 κΈ°λ³Έκ°’ λΆ€μ—¬
if data.columns.isnull().any():
data.columns = [f'Column_{i+1}' if pd.isnull(col) else col for i, col in enumerate(data.columns)]
return data
def manual_data_entry():
col_names = st.text_input("μ—΄ 이름을 μ‰Όν‘œλ‘œ κ΅¬λΆ„ν•˜μ—¬ μž…λ ₯ν•˜μ„Έμš”:", key="manual_col_names").split(',')
col_names = [name.strip() for name in col_names if name.strip()]
if col_names:
num_rows = st.number_input("초기 ν–‰μ˜ 수λ₯Ό μž…λ ₯ν•˜μ„Έμš”:", min_value=1, value=5, key="manual_num_rows")
data = pd.DataFrame(columns=col_names, index=range(num_rows))
edited_data = st.data_editor(data, num_rows="dynamic", key="manual_data_editor")
return edited_data
return None
def preprocess_data(data):
# 데이터 νƒ€μž… μΆ”λ‘  및 λ³€ν™˜
for column in data.columns:
if data[column].dtype == 'object':
try:
# NaN 값을 λ¬΄μ‹œν•˜κ³  숫자둜 λ³€ν™˜ μ‹œλ„
numeric_converted = pd.to_numeric(data[column], errors='coerce')
# λͺ¨λ“  값이 NaN이 μ•„λ‹ˆλΌλ©΄ λ³€ν™˜λœ 열을 μ‚¬μš©
if not numeric_converted.isna().all():
data[column] = numeric_converted
st.write(f"'{column}' 열을 μˆ«μžν˜•μœΌλ‘œ λ³€ν™˜ν–ˆμŠ΅λ‹ˆλ‹€.")
except:
st.write(f"'{column}' 열은 λ²”μ£Όν˜•μœΌλ‘œ μœ μ§€λ©λ‹ˆλ‹€.")
# 결츑치 처리 (κΈ°μ‘΄ μ½”λ“œ μœ μ§€)
if data.isnull().sum().sum() > 0:
st.write("결츑치 처리:")
for column in data.columns:
if data[column].isnull().sum() > 0:
method = st.selectbox(f"{column} μ—΄μ˜ 처리 방법 선택:",
["제거", "ν‰κ· μœΌλ‘œ λŒ€μ²΄", "μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄", "μ΅œλΉˆκ°’μœΌλ‘œ λŒ€μ²΄"],
key=f"missing_{column}")
if method == "제거":
data = data.dropna(subset=[column])
elif method == "ν‰κ· μœΌλ‘œ λŒ€μ²΄":
if pd.api.types.is_numeric_dtype(data[column]):
data[column].fillna(data[column].mean(), inplace=True)
else:
st.warning(f"{column} 열은 μˆ«μžν˜•μ΄ μ•„λ‹ˆμ–΄μ„œ ν‰κ· κ°’μœΌλ‘œ λŒ€μ²΄ν•  수 μ—†μŠ΅λ‹ˆλ‹€.")
elif method == "μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄":
if pd.api.types.is_numeric_dtype(data[column]):
data[column].fillna(data[column].median(), inplace=True)
else:
st.warning(f"{column} 열은 μˆ«μžν˜•μ΄ μ•„λ‹ˆμ–΄μ„œ μ€‘μ•™κ°’μœΌλ‘œ λŒ€μ²΄ν•  수 μ—†μŠ΅λ‹ˆλ‹€.")
elif method == "μ΅œλΉˆκ°’μœΌλ‘œ λŒ€μ²΄":
data[column].fillna(data[column].mode()[0], inplace=True)
# μˆ«μžν˜• μ—΄κ³Ό λ²”μ£Όν˜• μ—΄ 뢄리
st.session_state.numeric_columns = data.select_dtypes(include=['float64', 'int64']).columns.tolist()
st.session_state.categorical_columns = data.select_dtypes(exclude=['float64', 'int64']).columns.tolist()
return data
def update_filtered_data():
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data)
def create_slicers(data):
for col in st.session_state.categorical_columns:
if col in data.columns and data[col].nunique() <= 10:
st.session_state.slicers[col] = st.multiselect(
f"{col} 선택",
options=sorted(data[col].unique()),
default=sorted(data[col].unique()),
key=f"slicer_{col}",
on_change=update_filtered_data
)
def apply_slicers(data):
filtered_data = data.copy()
for col, selected_values in st.session_state.slicers.items():
if col in filtered_data.columns and selected_values:
filtered_data = filtered_data[filtered_data[col].isin(selected_values)]
return filtered_data
def plot_correlation_heatmap(data):
numeric_data = data[st.session_state.numeric_columns]
if not numeric_data.empty:
corr = numeric_data.corr()
fig = px.imshow(corr, color_continuous_scale='RdBu_r', zmin=-1, zmax=1)
fig.update_layout(title='상관관계 히트맡')
st.plotly_chart(fig)
else:
st.warning("상관관계 νžˆνŠΈλ§΅μ„ 그릴 수 μžˆλŠ” μˆ«μžν˜• 열이 μ—†μŠ΅λ‹ˆλ‹€.")
def check_normality(data, column):
# μ‹œκ°μ  검사: Q-Q plot
fig = go.Figure()
qq = stats.probplot(data[column], dist="norm")
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[0][1], mode='markers', name='Sample Quantiles'))
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[1][0] * qq[0][0] + qq[1][1], mode='lines', name='Theoretical Quantiles'))
fig.update_layout(title=f'Q-Q Plot for {column}', xaxis_title='Theoretical Quantiles', yaxis_title='Sample Quantiles')
st.plotly_chart(fig)
# 톡계적 검사: Shapiro-Wilk test
stat, p = stats.shapiro(data[column])
st.write(f"Shapiro-Wilk Test for {column}:")
st.write(f"ν†΅κ³„λŸ‰: {stat:.4f}")
st.write(f"p-value: {p:.4f}")
if p > 0.05:
st.write("데이터가 μ •κ·œ 뢄포λ₯Ό λ”°λ₯΄λŠ” κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€ (귀무가섀을 κΈ°κ°ν•˜μ§€ λͺ»ν•¨)")
else:
st.write("데이터가 μ •κ·œ 뢄포λ₯Ό λ”°λ₯΄μ§€ μ•ŠλŠ” κ²ƒμœΌλ‘œ λ³΄μž…λ‹ˆλ‹€ (귀무가섀 기각)")
def perform_independent_ttest(data, group_column, value_column):
groups = data[group_column].unique()
if len(groups) != 2:
st.error("독립 ν‘œλ³Έ t-검정은 μ •ν™•νžˆ 두 그룹이 ν•„μš”ν•©λ‹ˆλ‹€.")
return
group1 = data[data[group_column] == groups[0]][value_column]
group2 = data[data[group_column] == groups[1]][value_column]
t_stat, p_value = stats.ttest_ind(group1, group2)
st.write(f"독립 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({group_column} κΈ°μ€€, {value_column} 비ꡐ):")
st.write(f"κ·Έλ£Ή: {groups[0]} vs {groups[1]}")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write("두 κ·Έλ£Ή 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write("두 κ·Έλ£Ή 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_paired_ttest(data, column1, column2):
if len(data[column1]) != len(data[column2]):
st.error("λŒ€μ‘ ν‘œλ³Έ t-검정을 μœ„ν•΄μ„œλŠ” 두 μ—΄μ˜ 데이터 μˆ˜κ°€ κ°™μ•„μ•Ό ν•©λ‹ˆλ‹€.")
return
t_stat, p_value = stats.ttest_rel(data[column1], data[column2])
st.write(f"λŒ€μ‘ ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({column1} vs {column2}):")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"{column1}κ³Ό {column2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write(f"{column1}κ³Ό {column2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_onesample_ttest(data, column, test_value):
t_stat, p_value = stats.ttest_1samp(data[column], test_value)
st.write(f"단일 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό:")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"ν‘œλ³Έ 평균이 {test_value}와 μœ μ˜ν•˜κ²Œ λ‹€λ¦…λ‹ˆλ‹€.")
else:
st.write(f"ν‘œλ³Έ 평균이 {test_value}와 μœ μ˜ν•˜κ²Œ λ‹€λ₯΄μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.")
def plot_scatter_with_regression(data, x_var, y_var):
# νšŒκ·€ 뢄석 μˆ˜ν–‰
x = data[x_var]
y = data[y_var]
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# μ˜ˆμΈ‘κ°’ 계산
y_pred = slope * x + intercept
# μž”μ°¨ 계산
residuals = y - y_pred
# κ·Έλž˜ν”„ 생성
fig = go.Figure()
# 산점도 μΆ”κ°€ (였차 λ§‰λŒ€ 포함)
fig.add_trace(go.Scatter(
x=x,
y=y,
mode='markers',
name='Data Points',
marker=dict(color='rgba(0, 0, 255, 0.7)', size=10),
error_y=dict(
type='data',
array=abs(residuals),
visible=True,
color='rgba(0, 0, 0, 0.1)',
thickness=0.5,
width=0
)
))
# νšŒκ·€μ„  μΆ”κ°€
fig.add_trace(go.Scatter(
x=x,
y=y_pred,
mode='lines',
name='Regression Line',
line=dict(color='red', width=2)
))
# λ ˆμ΄μ•„μ›ƒ μ„€μ •
r_squared = r_value ** 2
fig.update_layout(
title=f'{x_var}와 {y_var}의 관계 (R-squared: {r_squared:.3f})',
xaxis_title=x_var,
yaxis_title=y_var,
showlegend=True,
annotations=[
dict(
x=0.05,
y=0.95,
xref='paper',
yref='paper',
text=f'y = {slope:.2f}x + {intercept:.2f}<br>RΒ² = {r_squared:.3f}',
showarrow=False,
bgcolor='rgba(255, 255, 255, 0.8)',
bordercolor='rgba(0, 0, 0, 0.3)',
borderwidth=1
)
]
)
st.plotly_chart(fig)
# μΆ”κ°€ 톡계 정보
st.write(f"μƒκ΄€κ³„μˆ˜: {r_value:.4f}")
st.write(f"p-value: {p_value:.4f}")
st.write(f"ν‘œμ€€ 였차: {std_err:.4f}")
def get_active_slicers():
return {col: values for col, values in st.session_state.slicers.items() if values}
def perform_independent_ttest(data, group_column, group1, group2, value_column):
group1_data = data[data[group_column] == group1][value_column]
group2_data = data[data[group_column] == group2][value_column]
t_stat, p_value = stats.ttest_ind(group1_data, group2_data)
st.write(f"독립 ν‘œλ³Έ T-κ²€μ • κ²°κ³Ό ({group_column}: {group1} vs {group2}, {value_column} 비ꡐ):")
st.write(f"t-ν†΅κ³„λŸ‰: {t_stat:.4f}")
st.write(f"p-value: {p_value:.4f}")
if p_value < 0.05:
st.write(f"{group1}κ³Ό {group2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μžˆμŠ΅λ‹ˆλ‹€.")
else:
st.write(f"{group1}κ³Ό {group2} 간에 ν†΅κ³„μ μœΌλ‘œ μœ μ˜ν•œ 차이가 μ—†μŠ΅λ‹ˆλ‹€.")
def perform_analysis():
if st.session_state.filtered_data is None:
st.session_state.filtered_data = st.session_state.processed_data.copy()
st.header("탐색적 데이터 뢄석")
# μŠ¬λΌμ΄μ„œ 생성
create_slicers(st.session_state.processed_data)
# 데이터가 변경될 λ•Œλ§ˆλ‹€ ν•„ν„°λ§λœ 데이터 μ—…λ°μ΄νŠΈ
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data)
# 3μ—΄ λ ˆμ΄μ•„μ›ƒ 생성
col1, col2, col3 = st.columns(3)
with col1:
# μš”μ•½ 톡계
st.write("μš”μ•½ 톡계:")
st.write(st.session_state.filtered_data.describe())
# 상관관계 히트맡
st.subheader("상관관계 히트맡")
plot_correlation_heatmap(st.session_state.filtered_data)
with col2:
# μ‚¬μš©μžκ°€ μ„ νƒν•œ 두 λ³€μˆ˜μ— λŒ€ν•œ 산점도 및 νšŒκ·€ 뢄석
st.subheader("두 λ³€μˆ˜ κ°„μ˜ 관계 뢄석")
x_var = st.selectbox("XμΆ• λ³€μˆ˜ 선택", options=st.session_state.numeric_columns, key='x_var')
y_var = st.selectbox("YμΆ• λ³€μˆ˜ 선택", options=[col for col in st.session_state.numeric_columns if col != x_var], key='y_var')
if x_var and y_var:
plot_scatter_with_regression(st.session_state.filtered_data, x_var, y_var)
with col3:
st.subheader("톡계적 κ²€μ •")
# μ •κ·œμ„± κ²€μ •
st.write("μ •κ·œμ„± κ²€μ •")
normality_column = st.selectbox("μ •κ·œμ„± 검정을 μˆ˜ν–‰ν•  μ—΄ 선택:", st.session_state.numeric_columns, key='normality_column')
if st.button("μ •κ·œμ„± κ²€μ • μˆ˜ν–‰"):
check_normality(st.session_state.filtered_data, normality_column)
# T-κ²€μ •
st.write("T-κ²€μ •")
test_type = st.radio("T-κ²€μ • μœ ν˜• 선택:", ["독립 ν‘œλ³Έ", "λŒ€μ‘ ν‘œλ³Έ", "단일 ν‘œλ³Έ"], key="test_type_radio")
if test_type == "독립 ν‘œλ³Έ":
active_slicers = get_active_slicers()
if active_slicers:
group_column = st.selectbox("κ·Έλ£Ή ꡬ뢄을 μœ„ν•œ μ—΄ 선택:", options=list(active_slicers.keys()))
available_groups = active_slicers[group_column]
group1 = st.selectbox("첫 번째 κ·Έλ£Ή 선택:", options=available_groups, key="group1")
group2 = st.selectbox("두 번째 κ·Έλ£Ή 선택:",
options=[g for g in available_groups if g != group1],
key="group2")
value_column = st.selectbox("비ꡐ할 값이 μžˆλŠ” μ—΄ 선택:", st.session_state.numeric_columns)
if st.button("독립 ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
if group1 and group2:
perform_independent_ttest(st.session_state.filtered_data, group_column, group1, group2, value_column)
else:
st.error("두 개의 μ„œλ‘œ λ‹€λ₯Έ 그룹을 μ„ νƒν•΄μ£Όμ„Έμš”.")
else:
st.warning("ν™œμ„±ν™”λœ μŠ¬λΌμ΄μ„œκ°€ μ—†μŠ΅λ‹ˆλ‹€. λ¨Όμ € μŠ¬λΌμ΄μ„œμ—μ„œ 그룹을 μ„ νƒν•΄μ£Όμ„Έμš”.")
elif test_type == "λŒ€μ‘ ν‘œλ³Έ":
column1 = st.selectbox("첫 번째 μ—΄ 선택:", st.session_state.numeric_columns, key="paired_col1")
column2 = st.selectbox("두 번째 μ—΄ 선택:",
[col for col in st.session_state.numeric_columns if col != column1],
key="paired_col2")
if st.button("λŒ€μ‘ ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
perform_paired_ttest(st.session_state.filtered_data, column1, column2)
elif test_type == "단일 ν‘œλ³Έ":
test_column = st.selectbox("κ²€μ •ν•  μ—΄ 선택:", st.session_state.numeric_columns, key="one_sample_col")
test_value = st.number_input("κ²€μ • κ°’ μž…λ ₯:", key="one_sample_value")
if st.button("단일 ν‘œλ³Έ T-κ²€μ • μˆ˜ν–‰"):
perform_onesample_ttest(st.session_state.filtered_data, test_column, test_value)
# 'λ‹€λ₯Έ 데이터 λΆ„μ„ν•˜κΈ°' λ²„νŠΌ μΆ”κ°€
if st.button("λ‹€λ₯Έ 데이터 λΆ„μ„ν•˜κΈ°(였λ₯˜κ°€ λ‚˜λ©΄ λ‹€μ‹œ λˆŒλŸ¬μ£Όμ„Έμš”)"):
reset_session_state()
st.experimental_rerun()
## 메인
def main():
st.title("λͺ¨λ‘κ°€ ν•  수 μžˆλŠ” 데이터 뢄석 νˆ΄ν‚· Data Analysis for Everyone")
st.link_button("λ§Œλ“ μ΄ μ½”λ‚œμŒ€", "https://www.youtube.com/@conanssam")
manage_session_state()
if st.session_state.data is None:
data_input_method = st.radio("데이터 μž…λ ₯ 방법 선택:", ("파일 μ—…λ‘œλ“œ", "μ˜ˆμ‹œ 데이터 μ‚¬μš©", "μˆ˜λ™ μž…λ ₯"), key="data_input_method")
if data_input_method == "파일 μ—…λ‘œλ“œ":
uploaded_file = st.file_uploader("CSV, XLS, λ˜λŠ” XLSX νŒŒμΌμ„ μ„ νƒν•˜μ„Έμš”", type=["csv", "xls", "xlsx"], key="file_uploader")
if uploaded_file is not None:
st.session_state.data = load_data(uploaded_file)
elif data_input_method == "μ˜ˆμ‹œ 데이터 μ‚¬μš©":
sample_choice = st.selectbox(
"μ˜ˆμ‹œ 데이터 선택",
options=[sample["name"] for sample in SAMPLE_DATA_FILES],
format_func=lambda x: x
)
if st.button("μ„ νƒν•œ μ˜ˆμ‹œ 데이터 λ‘œλ“œ"):
selected_file = next(sample["file"] for sample in SAMPLE_DATA_FILES if sample["name"] == sample_choice)
st.session_state.data = load_sample_data(selected_file)
else:
st.session_state.data = manual_data_entry()
if st.session_state.data is not None:
st.subheader("데이터 미리보기 및 μˆ˜μ •")
st.write("데이터λ₯Ό ν™•μΈν•˜κ³  ν•„μš”ν•œ 경우 μˆ˜μ •ν•˜μ„Έμš”:")
edited_data = st.data_editor(
st.session_state.data,
num_rows="dynamic",
key="main_data_editor"
)
if st.button("데이터 뢄석 μ‹œμž‘", key="start_analysis") or st.session_state.analysis_performed:
st.session_state.processed_data = preprocess_data(edited_data)
st.session_state.analysis_performed = True
if st.session_state.analysis_performed:
perform_analysis()
if __name__ == "__main__":
main()