import streamlit as st | |
import pandas as pd | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import numpy as np | |
from scipy.stats import zscore | |
st.set_page_config(layout="wide") | |
def categorize_marks(normalized_marks): | |
if normalized_marks >= 0.8: | |
return '80-100%' | |
elif normalized_marks >= 0.6: | |
return '60-80%' | |
else: | |
return '<60%' | |
def analyze_student_performance_by_li(question_data, student_performance_data): | |
# Merge the dataframes on question number | |
merged_data = pd.merge(student_performance_data, question_data, on='question_number') | |
merged_data = merged_data.groupby(["student_id","learning_indicator_id"])[["marks_obtained","maximum_marks"]].sum().reset_index() | |
merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks'] | |
# Categorize the normalized marks | |
merged_data['category'] = merged_data['normalized_marks'].apply(categorize_marks) | |
# Group by learning indicator ID and category, and count the number of students in each category | |
merged_data = merged_data.groupby(['learning_indicator_id', 'category']).size().unstack(fill_value=0) | |
# Rename the columns for better readability | |
merged_data.columns = ['<60%', '60-80%', '80-100%'] | |
merged_data = merged_data.sort_values(['<60%', '60-80%', '80-100%'],ascending=[False,False,False]).reset_index() | |
# Display the results | |
return merged_data | |
def prioritize_lis(category_counts): | |
# Add a rank column based on the order of rows | |
category_counts['Rank'] = category_counts.index + 1 | |
# Determine the number of LIs | |
total_lis = len(category_counts) | |
# Determine the cutoff points for high, medium, and low priority | |
high_priority_cutoff = int(total_lis * 0.3) | |
medium_priority_cutoff = int(total_lis * 0.6) | |
# Classify the LIs based on their rank | |
category_counts['Priority'] = 'Low' | |
category_counts.loc[category_counts['Rank'] <= high_priority_cutoff, 'Priority'] = 'High' | |
category_counts.loc[(category_counts['Rank'] > high_priority_cutoff) & (category_counts['Rank'] <= medium_priority_cutoff), 'Priority'] = 'Medium' | |
return category_counts | |
def mean_li_level_analysis(student_data, question_data): | |
merged_data = pd.merge(student_data, question_data, on='question_number',how="inner") | |
# Normalize the marks obtained for each learning indicator by each student | |
merged_data = merged_data.groupby(['learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() | |
merged_data["normalised_score"]=merged_data["marks_obtained"]/merged_data["maximum_marks"] | |
merged_data = pd.merge(merged_data,question_data[["learning_indicator_id","learning_indicator_text"]].drop_duplicates(),on="learning_indicator_id") | |
return merged_data | |
def student_level_analysis(student_data, question_data, prioritized_lis): | |
# Merge the student data with question data | |
merged_data = pd.merge(student_data, question_data, on='question_number',how="inner") | |
# Normalize the marks obtained for each learning indicator by each student | |
merged_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() | |
merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks'] | |
merged_data = merged_data[merged_data["normalized_marks"]<0.80] | |
# Merge with prioritized_lis to get the priority and rank | |
merged_data = pd.merge(merged_data, prioritized_lis[['learning_indicator_id', 'Rank']], on='learning_indicator_id', how='left') | |
# Rank the LIs for each student based on normalized marks and class-level LI priority | |
merged_data['student_rank'] = merged_data.groupby('student_id')['normalized_marks'].rank(method='dense', ascending=False) | |
merged_data = merged_data.sort_values(by=['student_id', 'student_rank', 'Rank']) | |
# Ensure unique ranks by adding a secondary sort by Rank | |
merged_data['unique_rank'] = merged_data.groupby('student_id').cumcount() + 1 | |
# Create the final dataframe | |
student_ranking = merged_data.pivot(index='student_id', columns='unique_rank', values='learning_indicator_id').reset_index() | |
student_ranking.columns = ['student_id'] + [f'P{I+1}_li' for I in range(student_ranking.shape[1] - 1)] | |
li_text_mapping = question_data.drop_duplicates(subset='learning_indicator_id').set_index('learning_indicator_id')['learning_indicator_text'] | |
for col in student_ranking.columns[1:]: | |
student_ranking[col] = student_ranking[col].map(li_text_mapping) | |
return student_ranking | |
def prepare_data_for_ridge_plot(student_data, question_data): | |
# Merge the DataFrames | |
merged_data = pd.merge(student_data, question_data, on='question_number', how='inner') | |
# Normalize the marks obtained for each learning indicator by each student | |
normalized_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() | |
normalized_data['normalized_marks'] = normalized_data['marks_obtained'] / normalized_data['maximum_marks'] | |
# Add learning_indicator_text to normalized_data | |
plot_data = pd.merge(normalized_data, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id') | |
return plot_data | |
def calculate_logical_quantiles(data, num_quantiles=5): | |
""" | |
Calculate logical quantiles for a given data set to ensure they are informative. | |
Parameters: | |
data (array-like): The input data for which to calculate quantiles. | |
num_quantiles (int): The number of quantiles to calculate. Default is 5. | |
Returns: | |
list: A list of quantile values. | |
""" | |
# Ensure there are enough unique values to calculate the quantiles | |
if len(np.unique(data)) < num_quantiles: | |
# If not enough unique values, use unique values as quantiles | |
quantiles = np.unique(data) | |
else: | |
# Calculate evenly spaced quantiles | |
quantiles = np.percentile(data, np.linspace(0, 100, num_quantiles)) | |
return quantiles.tolist() | |
def create_ridge_plot(plot_data): | |
unique_learning_indicators = plot_data['learning_indicator_text'].unique() | |
n_indicators = len(unique_learning_indicators) | |
bandwidth = 0.5 # Adjust bandwidth for smoother graphs | |
darkgreen = '#9BC184' | |
midgreen = '#C2D6A4' | |
lightgreen = '#E7E5CB' | |
colors = [lightgreen, midgreen, darkgreen, midgreen, lightgreen] | |
fig, axs = plt.subplots(nrows=n_indicators, ncols=1, figsize=(10, n_indicators * 1.5), sharex=True) | |
axs = axs.flatten() # Flatten in case of single plot | |
for i, indicator in enumerate(unique_learning_indicators): | |
# Subset the data for each learning indicator | |
subset = plot_data[plot_data['learning_indicator_text'] == indicator] | |
# Plot the distribution of normalized marks | |
sns.kdeplot( | |
subset['normalized_marks'], | |
shade=True, | |
bw_adjust=bandwidth, | |
ax=axs[i], | |
color=sns.color_palette('coolwarm', n_colors=n_indicators)[i] | |
) | |
quantiles = calculate_logical_quantiles(subset["normalized_marks"].tolist()) | |
# fill space between each pair of quantiles | |
for j in range(len(quantiles) - 1): | |
axs[i].fill_between( | |
[quantiles[j], # lower bound | |
quantiles[j+1]], # upper bound | |
0.1, # max y=0 | |
0.3, # max y=0.0002 | |
color=colors[j] | |
) | |
mean = subset['marks_obtained'].sum()/subset['maximum_marks'].sum() | |
axs[i].scatter([mean], [0.3], color='black', s=15) | |
global_mean = plot_data['normalized_marks'].mean() | |
axs[i].axvline(global_mean, color='#525252', linestyle='--') | |
axs[i].set_xlim(0, 1) | |
axs[i].set_ylim(0,3) | |
# Add the learning indicator text as the title | |
axs[i].set_title(indicator, loc='left', fontsize=12, fontweight='bold') | |
# Remove y-axis label | |
axs[i].set_ylabel('') | |
# Add a horizontal line for the baseline | |
axs[i].axhline(0, color='black', linewidth=1.3, linestyle='-') | |
# Set common labels | |
plt.xlabel('Normalized Marks', fontsize=12, fontweight='bold') | |
plt.tight_layout() | |
return fig | |
def remediation_groups(student_df, question_df, z_threshold=-1.35): | |
# Merge student performance with question data to get full marks | |
student_df["student_id"]=student_df["student_id"].astype(str) | |
merged_df = pd.merge(student_df, question_df, on='question_number') | |
# Apply minimum marks validation and fill NaN with 0 | |
merged_df['marks_obtained'] = np.minimum(merged_df['marks_obtained'].fillna(0), merged_df['full_marks']) | |
# Calculate normalized scores | |
merged_df['normalized_score'] = merged_df['marks_obtained'] / merged_df['full_marks'] | |
# Calculate z-scores for each learning indicator | |
z_scores = merged_df.groupby('learning_indicator_id')['normalized_score'].transform(zscore) | |
merged_df['z_score'] = z_scores | |
# Identify students needing remediation | |
remediation_df = merged_df[merged_df['z_score'] < z_threshold] | |
# Group by learning indicator to find students needing remediation | |
li_remediation_groups = remediation_df.groupby(['learning_indicator_id', 'learning_indicator_text'])['student_id'].apply(lambda x: ', '.join(x.unique())).reset_index() | |
# Identify students who don't need remediation | |
students_needing_remediation = remediation_df['student_id'].unique() | |
students_no_remediation = merged_df[~merged_df['student_id'].isin(students_needing_remediation)]['student_id'].unique() | |
no_remediation_df = pd.DataFrame(students_no_remediation, columns=['student_id']) | |
return li_remediation_groups, no_remediation_df | |
def main(): | |
col_logo , col_name = st.columns([1,3]) | |
with col_logo: | |
st.image("Screenshot 2024-08-07 at 1.05.24 PM.png") | |
with col_name: | |
st.title("Learning Indicator Analysis") | |
st.subheader("Student and Class Remediation based on Question Data and Student Data. Upload Files") | |
# Upload the dataframes | |
col_A,colB = st.columns(2) | |
with col_A: | |
question_data_file = st.file_uploader("Upload Question Data CSV", type="csv") | |
with colB: | |
student_performance_data_file = st.file_uploader("Upload Student Performance Data CSV", type="csv") | |
st.write("----------------------") | |
if question_data_file and student_performance_data_file: | |
question_data = pd.read_csv(question_data_file) | |
student_performance_data = pd.read_csv(student_performance_data_file) | |
# Analyze performance and prioritize LIs | |
category_counts = analyze_student_performance_by_li(question_data, student_performance_data) | |
prioritized_lis = prioritize_lis(category_counts) | |
# Merge with original question data to get the learning indicator text | |
prioritized_lis = pd.merge(prioritized_lis, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id', how='left') | |
# Display the results with filters | |
st.write("Learning Indicator Analysis with Priority") | |
def highlight_priority(row): | |
if row.Priority == 'High': | |
return ['background-color: red']*len(row) | |
elif row.Priority == 'Medium': | |
return ['background-color: yellow']*len(row) | |
elif row.Priority == 'Low': | |
return ['background-color: green']*len(row) | |
else: | |
return ['']*len(row) | |
col1,col2 = st.columns(2) | |
with col1: | |
st.dataframe(, axis=1)) | |
overall_li_level = mean_li_level_analysis(student_performance_data, question_data) | |
overall_li_level = overall_li_level.sort_values("normalised_score") | |
st.dataframe(overall_li_level) | |
with col2: | |
plt_data=prepare_data_for_ridge_plot(student_performance_data, question_data) | |
plt_fig = create_ridge_plot(plt_data) | |
st.pyplot(plt_fig) | |
st.write("---------------------------") | |
col3,col4 = st.columns(2) | |
li_remediation_groups, no_remediation_df = remediation_groups(student_performance_data,question_data) | |
with col3: | |
st.write("Student Group Remediation based on LI") | |
st.dataframe(li_remediation_groups) | |
with col4: | |
st.write("Students That are not part of group remediation") | |
st.dataframe(no_remediation_df) | |
# Filters for LI ID and Priority | |
li_id_filter = st.multiselect("Exclude Li_ids :", prioritized_lis['learning_indicator_id'].unique()) | |
priority_filter = st.multiselect("Exclude Priority:",prioritized_lis["Priority"].unique()) | |
if li_id_filter: | |
prioritized_lis = prioritized_lis[~prioritized_lis["learning_indicator_id"].isin(li_id_filter)] | |
question_data = question_data[~question_data["learning_indicator_id"].isin(li_id_filter)] | |
if priority_filter: | |
li_ids_out = prioritized_lis[prioritized_lis["Priority"].isin(priority_filter)]["learning_indicator_id"].unique().tolist() | |
question_data = question_data[~question_data["learning_indicator_id"].isin(li_ids_out)] | |
# Button to generate student-level ranking | |
if st.button("Generate Student Level Ranking"): | |
print(len(question_data),"==question") | |
print(len(prioritized_lis),"===priotisex") | |
student_ranking = student_level_analysis(student_performance_data, question_data, prioritized_lis) | |
st.write("Student Level Learning Indicator Ranking") | |
st.dataframe(student_ranking) | |
if __name__ == "__main__": | |
main() |