Spaces:
Runtime error
Runtime error
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from io import StringIO | |
import openpyxl | |
import matplotlib.font_manager as fm | |
from scipy import stats | |
import os | |
import plotly.figure_factory as ff | |
#μ¬μ΄μ¦ ν¬κ² | |
st.set_page_config(layout="wide") | |
# νκΈ ν°νΈ μ€μ | |
def set_font(): | |
font_path = "Pretendard-Bold.ttf" # μ€μ ν°νΈ νμΌ κ²½λ‘λ‘ λ³κ²½ν΄μ£ΌμΈμ | |
fm.fontManager.addfont(font_path) | |
return {'font.family': 'Pretendard-Bold', 'axes.unicode_minus': False} | |
# ν°νΈ μ€μ μ κ°μ Έμ΅λλ€ | |
font_settings = set_font() | |
# μΈμ μν μ΄κΈ°ν λ° κ΄λ¦¬ | |
def manage_session_state(): | |
if 'data' not in st.session_state: | |
st.session_state.data = None | |
if 'processed_data' not in st.session_state: | |
st.session_state.processed_data = None | |
if 'numeric_columns' not in st.session_state: | |
st.session_state.numeric_columns = [] | |
if 'categorical_columns' not in st.session_state: | |
st.session_state.categorical_columns = [] | |
if 'x_var' not in st.session_state: | |
st.session_state.x_var = None | |
if 'y_var' not in st.session_state: | |
st.session_state.y_var = None | |
if 'slicers' not in st.session_state: | |
st.session_state.slicers = {} | |
if 'analysis_performed' not in st.session_state: | |
st.session_state.analysis_performed = False | |
if 'filtered_data' not in st.session_state: | |
st.session_state.filtered_data = None | |
def reset_session_state(): | |
# μΈμ μν μ΄κΈ°ν | |
st.session_state.data = None | |
st.session_state.processed_data = None | |
st.session_state.filtered_data = None | |
st.session_state.numeric_columns = [] | |
st.session_state.categorical_columns = [] | |
st.session_state.x_var = None | |
st.session_state.y_var = None | |
st.session_state.slicers = {} | |
st.session_state.analysis_performed = False | |
SAMPLE_DATA_FILES = [ | |
{"name": "κ³Όλͺ©λ³ λ Έλ ₯κ³Ό μ±μ·¨λ", "file": "subject.xlsx"}, | |
{"name": "μ±μ ", "file": "score.xlsx"}, | |
{"name": "μΆμμΌμμ μ±μ ", "file": "attendance.xlsx"} | |
] | |
def load_sample_data(file_name): | |
# μμ λ°μ΄ν° νμΌ κ²½λ‘ | |
file_path = os.path.join("sample_data", file_name) | |
if file_name.endswith('.csv'): | |
return pd.read_csv(file_path) | |
elif file_name.endswith(('.xls', '.xlsx')): | |
return pd.read_excel(file_path) | |
else: | |
st.error("μ§μλμ§ μλ νμΌ νμμ λλ€.") | |
return None | |
# λ°μ΄ν° λ‘λ | |
def load_data(file): | |
file_extension = file.name.split('.')[-1].lower() | |
if file_extension == 'csv': | |
data = pd.read_csv(file) | |
elif file_extension in ['xls', 'xlsx']: | |
data = pd.read_excel(file) | |
else: | |
st.error("μ§μλμ§ μλ νμΌ νμμ λλ€. CSV, XLS, λλ XLSX νμΌμ μ λ‘λν΄μ£ΌμΈμ.") | |
return None | |
# λΉ μ΄ μ΄λ¦μ κΈ°λ³Έκ° λΆμ¬ | |
if data.columns.isnull().any(): | |
data.columns = [f'Column_{i+1}' if pd.isnull(col) else col for i, col in enumerate(data.columns)] | |
return data | |
def manual_data_entry(): | |
col_names = st.text_input("μ΄ μ΄λ¦μ μΌνλ‘ κ΅¬λΆνμ¬ μ λ ₯νμΈμ:", key="manual_col_names").split(',') | |
col_names = [name.strip() for name in col_names if name.strip()] | |
if col_names: | |
num_rows = st.number_input("μ΄κΈ° νμ μλ₯Ό μ λ ₯νμΈμ:", min_value=1, value=5, key="manual_num_rows") | |
data = pd.DataFrame(columns=col_names, index=range(num_rows)) | |
edited_data = st.data_editor(data, num_rows="dynamic", key="manual_data_editor") | |
return edited_data | |
return None | |
def preprocess_data(data): | |
# λ°μ΄ν° νμ μΆλ‘ λ° λ³ν | |
for column in data.columns: | |
if data[column].dtype == 'object': | |
try: | |
# NaN κ°μ 무μνκ³ μ«μλ‘ λ³ν μλ | |
numeric_converted = pd.to_numeric(data[column], errors='coerce') | |
# λͺ¨λ κ°μ΄ NaNμ΄ μλλΌλ©΄ λ³νλ μ΄μ μ¬μ© | |
if not numeric_converted.isna().all(): | |
data[column] = numeric_converted | |
st.write(f"'{column}' μ΄μ μ«μνμΌλ‘ λ³ννμ΅λλ€.") | |
except: | |
st.write(f"'{column}' μ΄μ λ²μ£ΌνμΌλ‘ μ μ§λ©λλ€.") | |
# κ²°μΈ‘μΉ μ²λ¦¬ (κΈ°μ‘΄ μ½λ μ μ§) | |
if data.isnull().sum().sum() > 0: | |
st.write("κ²°μΈ‘μΉ μ²λ¦¬:") | |
for column in data.columns: | |
if data[column].isnull().sum() > 0: | |
method = st.selectbox(f"{column} μ΄μ μ²λ¦¬ λ°©λ² μ ν:", | |
["μ κ±°", "νκ· μΌλ‘ λ체", "μ€μκ°μΌλ‘ λ체", "μ΅λΉκ°μΌλ‘ λ체"], | |
key=f"missing_{column}") | |
if method == "μ κ±°": | |
data = data.dropna(subset=[column]) | |
elif method == "νκ· μΌλ‘ λ체": | |
if pd.api.types.is_numeric_dtype(data[column]): | |
data[column].fillna(data[column].mean(), inplace=True) | |
else: | |
st.warning(f"{column} μ΄μ μ«μνμ΄ μλμ΄μ νκ· κ°μΌλ‘ λ체ν μ μμ΅λλ€.") | |
elif method == "μ€μκ°μΌλ‘ λ체": | |
if pd.api.types.is_numeric_dtype(data[column]): | |
data[column].fillna(data[column].median(), inplace=True) | |
else: | |
st.warning(f"{column} μ΄μ μ«μνμ΄ μλμ΄μ μ€μκ°μΌλ‘ λ체ν μ μμ΅λλ€.") | |
elif method == "μ΅λΉκ°μΌλ‘ λ체": | |
data[column].fillna(data[column].mode()[0], inplace=True) | |
# μ«μν μ΄κ³Ό λ²μ£Όν μ΄ λΆλ¦¬ | |
st.session_state.numeric_columns = data.select_dtypes(include=['float64', 'int64']).columns.tolist() | |
st.session_state.categorical_columns = data.select_dtypes(exclude=['float64', 'int64']).columns.tolist() | |
return data | |
def update_filtered_data(): | |
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data) | |
def create_slicers(data): | |
for col in st.session_state.categorical_columns: | |
if col in data.columns and data[col].nunique() <= 10: | |
st.session_state.slicers[col] = st.multiselect( | |
f"{col} μ ν", | |
options=sorted(data[col].unique()), | |
default=sorted(data[col].unique()), | |
key=f"slicer_{col}", | |
on_change=update_filtered_data | |
) | |
def apply_slicers(data): | |
filtered_data = data.copy() | |
for col, selected_values in st.session_state.slicers.items(): | |
if col in filtered_data.columns and selected_values: | |
filtered_data = filtered_data[filtered_data[col].isin(selected_values)] | |
return filtered_data | |
def plot_correlation_heatmap(data): | |
numeric_data = data[st.session_state.numeric_columns] | |
if not numeric_data.empty: | |
corr = numeric_data.corr() | |
fig = px.imshow(corr, color_continuous_scale='RdBu_r', zmin=-1, zmax=1) | |
fig.update_layout(title='μκ΄κ΄κ³ ννΈλ§΅') | |
st.plotly_chart(fig) | |
else: | |
st.warning("μκ΄κ΄κ³ ννΈλ§΅μ 그릴 μ μλ μ«μν μ΄μ΄ μμ΅λλ€.") | |
def check_normality(data, column): | |
# μκ°μ κ²μ¬: Q-Q plot | |
fig = go.Figure() | |
qq = stats.probplot(data[column], dist="norm") | |
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[0][1], mode='markers', name='Sample Quantiles')) | |
fig.add_trace(go.Scatter(x=qq[0][0], y=qq[1][0] * qq[0][0] + qq[1][1], mode='lines', name='Theoretical Quantiles')) | |
fig.update_layout(title=f'Q-Q Plot for {column}', xaxis_title='Theoretical Quantiles', yaxis_title='Sample Quantiles') | |
st.plotly_chart(fig) | |
# ν΅κ³μ κ²μ¬: Shapiro-Wilk test | |
stat, p = stats.shapiro(data[column]) | |
st.write(f"Shapiro-Wilk Test for {column}:") | |
st.write(f"ν΅κ³λ: {stat:.4f}") | |
st.write(f"p-value: {p:.4f}") | |
if p > 0.05: | |
st.write("λ°μ΄ν°κ° μ κ· λΆν¬λ₯Ό λ°λ₯΄λ κ²μΌλ‘ 보μ λλ€ (κ·λ¬΄κ°μ€μ κΈ°κ°νμ§ λͺ»ν¨)") | |
else: | |
st.write("λ°μ΄ν°κ° μ κ· λΆν¬λ₯Ό λ°λ₯΄μ§ μλ κ²μΌλ‘ 보μ λλ€ (κ·λ¬΄κ°μ€ κΈ°κ°)") | |
def perform_independent_ttest(data, group_column, value_column): | |
groups = data[group_column].unique() | |
if len(groups) != 2: | |
st.error("λ 립 νλ³Έ t-κ²μ μ μ νν λ κ·Έλ£Ήμ΄ νμν©λλ€.") | |
return | |
group1 = data[data[group_column] == groups[0]][value_column] | |
group2 = data[data[group_column] == groups[1]][value_column] | |
t_stat, p_value = stats.ttest_ind(group1, group2) | |
st.write(f"λ 립 νλ³Έ T-κ²μ κ²°κ³Ό ({group_column} κΈ°μ€, {value_column} λΉκ΅):") | |
st.write(f"κ·Έλ£Ή: {groups[0]} vs {groups[1]}") | |
st.write(f"t-ν΅κ³λ: {t_stat:.4f}") | |
st.write(f"p-value: {p_value:.4f}") | |
if p_value < 0.05: | |
st.write("λ κ·Έλ£Ή κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
else: | |
st.write("λ κ·Έλ£Ή κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
def perform_paired_ttest(data, column1, column2): | |
if len(data[column1]) != len(data[column2]): | |
st.error("λμ νλ³Έ t-κ²μ μ μν΄μλ λ μ΄μ λ°μ΄ν° μκ° κ°μμΌ ν©λλ€.") | |
return | |
t_stat, p_value = stats.ttest_rel(data[column1], data[column2]) | |
st.write(f"λμ νλ³Έ T-κ²μ κ²°κ³Ό ({column1} vs {column2}):") | |
st.write(f"t-ν΅κ³λ: {t_stat:.4f}") | |
st.write(f"p-value: {p_value:.4f}") | |
if p_value < 0.05: | |
st.write(f"{column1}κ³Ό {column2} κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
else: | |
st.write(f"{column1}κ³Ό {column2} κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
def perform_onesample_ttest(data, column, test_value): | |
t_stat, p_value = stats.ttest_1samp(data[column], test_value) | |
st.write(f"λ¨μΌ νλ³Έ T-κ²μ κ²°κ³Ό:") | |
st.write(f"t-ν΅κ³λ: {t_stat:.4f}") | |
st.write(f"p-value: {p_value:.4f}") | |
if p_value < 0.05: | |
st.write(f"νλ³Έ νκ· μ΄ {test_value}μ μ μνκ² λ€λ¦ λλ€.") | |
else: | |
st.write(f"νλ³Έ νκ· μ΄ {test_value}μ μ μνκ² λ€λ₯΄μ§ μμ΅λλ€.") | |
def plot_scatter_with_regression(data, x_var, y_var): | |
# νκ· λΆμ μν | |
x = data[x_var] | |
y = data[y_var] | |
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y) | |
# μμΈ‘κ° κ³μ° | |
y_pred = slope * x + intercept | |
# μμ°¨ κ³μ° | |
residuals = y - y_pred | |
# κ·Έλν μμ± | |
fig = go.Figure() | |
# μ°μ λ μΆκ° (μ€μ°¨ λ§λ ν¬ν¨) | |
fig.add_trace(go.Scatter( | |
x=x, | |
y=y, | |
mode='markers', | |
name='Data Points', | |
marker=dict(color='rgba(0, 0, 255, 0.7)', size=10), | |
error_y=dict( | |
type='data', | |
array=abs(residuals), | |
visible=True, | |
color='rgba(0, 0, 0, 0.1)', | |
thickness=0.5, | |
width=0 | |
) | |
)) | |
# νκ·μ μΆκ° | |
fig.add_trace(go.Scatter( | |
x=x, | |
y=y_pred, | |
mode='lines', | |
name='Regression Line', | |
line=dict(color='red', width=2) | |
)) | |
# λ μ΄μμ μ€μ | |
r_squared = r_value ** 2 | |
fig.update_layout( | |
title=f'{x_var}μ {y_var}μ κ΄κ³ (R-squared: {r_squared:.3f})', | |
xaxis_title=x_var, | |
yaxis_title=y_var, | |
showlegend=True, | |
annotations=[ | |
dict( | |
x=0.05, | |
y=0.95, | |
xref='paper', | |
yref='paper', | |
text=f'y = {slope:.2f}x + {intercept:.2f}<br>RΒ² = {r_squared:.3f}', | |
showarrow=False, | |
bgcolor='rgba(255, 255, 255, 0.8)', | |
bordercolor='rgba(0, 0, 0, 0.3)', | |
borderwidth=1 | |
) | |
] | |
) | |
st.plotly_chart(fig) | |
# μΆκ° ν΅κ³ μ 보 | |
st.write(f"μκ΄κ³μ: {r_value:.4f}") | |
st.write(f"p-value: {p_value:.4f}") | |
st.write(f"νμ€ μ€μ°¨: {std_err:.4f}") | |
def get_active_slicers(): | |
return {col: values for col, values in st.session_state.slicers.items() if values} | |
def perform_independent_ttest(data, group_column, group1, group2, value_column): | |
group1_data = data[data[group_column] == group1][value_column] | |
group2_data = data[data[group_column] == group2][value_column] | |
t_stat, p_value = stats.ttest_ind(group1_data, group2_data) | |
st.write(f"λ 립 νλ³Έ T-κ²μ κ²°κ³Ό ({group_column}: {group1} vs {group2}, {value_column} λΉκ΅):") | |
st.write(f"t-ν΅κ³λ: {t_stat:.4f}") | |
st.write(f"p-value: {p_value:.4f}") | |
if p_value < 0.05: | |
st.write(f"{group1}κ³Ό {group2} κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
else: | |
st.write(f"{group1}κ³Ό {group2} κ°μ ν΅κ³μ μΌλ‘ μ μν μ°¨μ΄κ° μμ΅λλ€.") | |
def perform_analysis(): | |
if st.session_state.filtered_data is None: | |
st.session_state.filtered_data = st.session_state.processed_data.copy() | |
st.header("νμμ λ°μ΄ν° λΆμ") | |
# μ¬λΌμ΄μ μμ± | |
create_slicers(st.session_state.processed_data) | |
# λ°μ΄ν°κ° λ³κ²½λ λλ§λ€ νν°λ§λ λ°μ΄ν° μ λ°μ΄νΈ | |
st.session_state.filtered_data = apply_slicers(st.session_state.processed_data) | |
# 3μ΄ λ μ΄μμ μμ± | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
# μμ½ ν΅κ³ | |
st.write("μμ½ ν΅κ³:") | |
st.write(st.session_state.filtered_data.describe()) | |
# μκ΄κ΄κ³ ννΈλ§΅ | |
st.subheader("μκ΄κ΄κ³ ννΈλ§΅") | |
plot_correlation_heatmap(st.session_state.filtered_data) | |
with col2: | |
# μ¬μ©μκ° μ νν λ λ³μμ λν μ°μ λ λ° νκ· λΆμ | |
st.subheader("λ λ³μ κ°μ κ΄κ³ λΆμ") | |
x_var = st.selectbox("XμΆ λ³μ μ ν", options=st.session_state.numeric_columns, key='x_var') | |
y_var = st.selectbox("YμΆ λ³μ μ ν", options=[col for col in st.session_state.numeric_columns if col != x_var], key='y_var') | |
if x_var and y_var: | |
plot_scatter_with_regression(st.session_state.filtered_data, x_var, y_var) | |
with col3: | |
st.subheader("ν΅κ³μ κ²μ ") | |
# μ κ·μ± κ²μ | |
st.write("μ κ·μ± κ²μ ") | |
normality_column = st.selectbox("μ κ·μ± κ²μ μ μνν μ΄ μ ν:", st.session_state.numeric_columns, key='normality_column') | |
if st.button("μ κ·μ± κ²μ μν"): | |
check_normality(st.session_state.filtered_data, normality_column) | |
# T-κ²μ | |
st.write("T-κ²μ ") | |
test_type = st.radio("T-κ²μ μ ν μ ν:", ["λ 립 νλ³Έ", "λμ νλ³Έ", "λ¨μΌ νλ³Έ"], key="test_type_radio") | |
if test_type == "λ 립 νλ³Έ": | |
active_slicers = get_active_slicers() | |
if active_slicers: | |
group_column = st.selectbox("κ·Έλ£Ή ꡬλΆμ μν μ΄ μ ν:", options=list(active_slicers.keys())) | |
available_groups = active_slicers[group_column] | |
group1 = st.selectbox("첫 λ²μ§Έ κ·Έλ£Ή μ ν:", options=available_groups, key="group1") | |
group2 = st.selectbox("λ λ²μ§Έ κ·Έλ£Ή μ ν:", | |
options=[g for g in available_groups if g != group1], | |
key="group2") | |
value_column = st.selectbox("λΉκ΅ν κ°μ΄ μλ μ΄ μ ν:", st.session_state.numeric_columns) | |
if st.button("λ 립 νλ³Έ T-κ²μ μν"): | |
if group1 and group2: | |
perform_independent_ttest(st.session_state.filtered_data, group_column, group1, group2, value_column) | |
else: | |
st.error("λ κ°μ μλ‘ λ€λ₯Έ κ·Έλ£Ήμ μ νν΄μ£ΌμΈμ.") | |
else: | |
st.warning("νμ±νλ μ¬λΌμ΄μκ° μμ΅λλ€. λ¨Όμ μ¬λΌμ΄μμμ κ·Έλ£Ήμ μ νν΄μ£ΌμΈμ.") | |
elif test_type == "λμ νλ³Έ": | |
column1 = st.selectbox("첫 λ²μ§Έ μ΄ μ ν:", st.session_state.numeric_columns, key="paired_col1") | |
column2 = st.selectbox("λ λ²μ§Έ μ΄ μ ν:", | |
[col for col in st.session_state.numeric_columns if col != column1], | |
key="paired_col2") | |
if st.button("λμ νλ³Έ T-κ²μ μν"): | |
perform_paired_ttest(st.session_state.filtered_data, column1, column2) | |
elif test_type == "λ¨μΌ νλ³Έ": | |
test_column = st.selectbox("κ²μ ν μ΄ μ ν:", st.session_state.numeric_columns, key="one_sample_col") | |
test_value = st.number_input("κ²μ κ° μ λ ₯:", key="one_sample_value") | |
if st.button("λ¨μΌ νλ³Έ T-κ²μ μν"): | |
perform_onesample_ttest(st.session_state.filtered_data, test_column, test_value) | |
# 'λ€λ₯Έ λ°μ΄ν° λΆμνκΈ°' λ²νΌ μΆκ° | |
if st.button("λ€λ₯Έ λ°μ΄ν° λΆμνκΈ°(μ€λ₯κ° λλ©΄ λ€μ λλ¬μ£ΌμΈμ)"): | |
reset_session_state() | |
st.experimental_rerun() | |
## λ©μΈ | |
def main(): | |
st.title("λͺ¨λκ° ν μ μλ λ°μ΄ν° λΆμ ν΄ν· Data Analysis for Everyone") | |
st.link_button("λ§λ μ΄ μ½λμ€", "https://www.youtube.com/@conanssam") | |
manage_session_state() | |
if st.session_state.data is None: | |
data_input_method = st.radio("λ°μ΄ν° μ λ ₯ λ°©λ² μ ν:", ("νμΌ μ λ‘λ", "μμ λ°μ΄ν° μ¬μ©", "μλ μ λ ₯"), key="data_input_method") | |
if data_input_method == "νμΌ μ λ‘λ": | |
uploaded_file = st.file_uploader("CSV, XLS, λλ XLSX νμΌμ μ ννμΈμ", type=["csv", "xls", "xlsx"], key="file_uploader") | |
if uploaded_file is not None: | |
st.session_state.data = load_data(uploaded_file) | |
elif data_input_method == "μμ λ°μ΄ν° μ¬μ©": | |
sample_choice = st.selectbox( | |
"μμ λ°μ΄ν° μ ν", | |
options=[sample["name"] for sample in SAMPLE_DATA_FILES], | |
format_func=lambda x: x | |
) | |
if st.button("μ νν μμ λ°μ΄ν° λ‘λ"): | |
selected_file = next(sample["file"] for sample in SAMPLE_DATA_FILES if sample["name"] == sample_choice) | |
st.session_state.data = load_sample_data(selected_file) | |
else: | |
st.session_state.data = manual_data_entry() | |
if st.session_state.data is not None: | |
st.subheader("λ°μ΄ν° 미리보기 λ° μμ ") | |
st.write("λ°μ΄ν°λ₯Ό νμΈνκ³ νμν κ²½μ° μμ νμΈμ:") | |
edited_data = st.data_editor( | |
st.session_state.data, | |
num_rows="dynamic", | |
key="main_data_editor" | |
) | |
if st.button("λ°μ΄ν° λΆμ μμ", key="start_analysis") or st.session_state.analysis_performed: | |
st.session_state.processed_data = preprocess_data(edited_data) | |
st.session_state.analysis_performed = True | |
if st.session_state.analysis_performed: | |
perform_analysis() | |
if __name__ == "__main__": | |
main() |