import streamlit as st import pandas as pd import seaborn as sns import matplotlib.pyplot as plt import numpy as np from scipy.stats import zscore st.set_page_config(layout="wide") def categorize_marks(normalized_marks): if normalized_marks >= 0.8: return '80-100%' elif normalized_marks >= 0.6: return '60-80%' else: return '<60%' def analyze_student_performance_by_li(question_data, student_performance_data): # Merge the dataframes on question number merged_data = pd.merge(student_performance_data, question_data, on='question_number') merged_data = merged_data.groupby(["student_id","learning_indicator_id"])[["marks_obtained","maximum_marks"]].sum().reset_index() merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks'] # Categorize the normalized marks merged_data['category'] = merged_data['normalized_marks'].apply(categorize_marks) # Group by learning indicator ID and category, and count the number of students in each category merged_data = merged_data.groupby(['learning_indicator_id', 'category']).size().unstack(fill_value=0) # Rename the columns for better readability merged_data.columns = ['<60%', '60-80%', '80-100%'] merged_data = merged_data.sort_values(['<60%', '60-80%', '80-100%'],ascending=[False,False,False]).reset_index() # Display the results return merged_data def prioritize_lis(category_counts): # Add a rank column based on the order of rows category_counts['Rank'] = category_counts.index + 1 # Determine the number of LIs total_lis = len(category_counts) # Determine the cutoff points for high, medium, and low priority high_priority_cutoff = int(total_lis * 0.3) medium_priority_cutoff = int(total_lis * 0.6) # Classify the LIs based on their rank category_counts['Priority'] = 'Low' category_counts.loc[category_counts['Rank'] <= high_priority_cutoff, 'Priority'] = 'High' category_counts.loc[(category_counts['Rank'] > high_priority_cutoff) & (category_counts['Rank'] <= medium_priority_cutoff), 'Priority'] = 'Medium' return category_counts def mean_li_level_analysis(student_data, question_data): merged_data = pd.merge(student_data, question_data, on='question_number',how="inner") # Normalize the marks obtained for each learning indicator by each student merged_data = merged_data.groupby(['learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() merged_data["normalised_score"]=merged_data["marks_obtained"]/merged_data["maximum_marks"] merged_data = pd.merge(merged_data,question_data[["learning_indicator_id","learning_indicator_text"]].drop_duplicates(),on="learning_indicator_id") return merged_data def student_level_analysis(student_data, question_data, prioritized_lis): # Merge the student data with question data merged_data = pd.merge(student_data, question_data, on='question_number',how="inner") # Normalize the marks obtained for each learning indicator by each student merged_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks'] merged_data = merged_data[merged_data["normalized_marks"]<0.80] # Merge with prioritized_lis to get the priority and rank merged_data = pd.merge(merged_data, prioritized_lis[['learning_indicator_id', 'Rank']], on='learning_indicator_id', how='left') # Rank the LIs for each student based on normalized marks and class-level LI priority merged_data['student_rank'] = merged_data.groupby('student_id')['normalized_marks'].rank(method='dense', ascending=False) merged_data = merged_data.sort_values(by=['student_id', 'student_rank', 'Rank']) # Ensure unique ranks by adding a secondary sort by Rank merged_data['unique_rank'] = merged_data.groupby('student_id').cumcount() + 1 # Create the final dataframe student_ranking = merged_data.pivot(index='student_id', columns='unique_rank', values='learning_indicator_id').reset_index() student_ranking.columns = ['student_id'] + [f'P{I+1}_li' for I in range(student_ranking.shape[1] - 1)] li_text_mapping = question_data.drop_duplicates(subset='learning_indicator_id').set_index('learning_indicator_id')['learning_indicator_text'] for col in student_ranking.columns[1:]: student_ranking[col] = student_ranking[col].map(li_text_mapping) return student_ranking def prepare_data_for_ridge_plot(student_data, question_data): # Merge the DataFrames merged_data = pd.merge(student_data, question_data, on='question_number', how='inner') # Normalize the marks obtained for each learning indicator by each student normalized_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index() normalized_data['normalized_marks'] = normalized_data['marks_obtained'] / normalized_data['maximum_marks'] # Add learning_indicator_text to normalized_data plot_data = pd.merge(normalized_data, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id') return plot_data def calculate_logical_quantiles(data, num_quantiles=5): """ Calculate logical quantiles for a given data set to ensure they are informative. Parameters: data (array-like): The input data for which to calculate quantiles. num_quantiles (int): The number of quantiles to calculate. Default is 5. Returns: list: A list of quantile values. """ # Ensure there are enough unique values to calculate the quantiles if len(np.unique(data)) < num_quantiles: # If not enough unique values, use unique values as quantiles quantiles = np.unique(data) else: # Calculate evenly spaced quantiles quantiles = np.percentile(data, np.linspace(0, 100, num_quantiles)) return quantiles.tolist() def create_ridge_plot(plot_data): unique_learning_indicators = plot_data['learning_indicator_text'].unique() n_indicators = len(unique_learning_indicators) bandwidth = 0.5 # Adjust bandwidth for smoother graphs darkgreen = '#9BC184' midgreen = '#C2D6A4' lightgreen = '#E7E5CB' colors = [lightgreen, midgreen, darkgreen, midgreen, lightgreen] fig, axs = plt.subplots(nrows=n_indicators, ncols=1, figsize=(10, n_indicators * 1.5), sharex=True) axs = axs.flatten() # Flatten in case of single plot for i, indicator in enumerate(unique_learning_indicators): # Subset the data for each learning indicator subset = plot_data[plot_data['learning_indicator_text'] == indicator] # Plot the distribution of normalized marks sns.kdeplot( subset['normalized_marks'], shade=True, bw_adjust=bandwidth, ax=axs[i], color=sns.color_palette('coolwarm', n_colors=n_indicators)[i] ) quantiles = calculate_logical_quantiles(subset["normalized_marks"].tolist()) # fill space between each pair of quantiles for j in range(len(quantiles) - 1): axs[i].fill_between( [quantiles[j], # lower bound quantiles[j+1]], # upper bound 0.1, # max y=0 0.3, # max y=0.0002 color=colors[j] ) mean = subset['marks_obtained'].sum()/subset['maximum_marks'].sum() axs[i].scatter([mean], [0.3], color='black', s=15) global_mean = plot_data['normalized_marks'].mean() axs[i].axvline(global_mean, color='#525252', linestyle='--') axs[i].set_xlim(0, 1) axs[i].set_ylim(0,3) # Add the learning indicator text as the title axs[i].set_title(indicator, loc='left', fontsize=12, fontweight='bold') # Remove y-axis label axs[i].set_ylabel('') # Add a horizontal line for the baseline axs[i].axhline(0, color='black', linewidth=1.3, linestyle='-') # Set common labels plt.xlabel('Normalized Marks', fontsize=12, fontweight='bold') plt.tight_layout() return fig def remediation_groups(student_df, question_df, z_threshold=-1.35): # Merge student performance with question data to get full marks student_df["student_id"]=student_df["student_id"].astype(str) merged_df = pd.merge(student_df, question_df, on='question_number') # Apply minimum marks validation and fill NaN with 0 merged_df['marks_obtained'] = np.minimum(merged_df['marks_obtained'].fillna(0), merged_df['full_marks']) # Calculate normalized scores merged_df['normalized_score'] = merged_df['marks_obtained'] / merged_df['full_marks'] # Calculate z-scores for each learning indicator z_scores = merged_df.groupby('learning_indicator_id')['normalized_score'].transform(zscore) merged_df['z_score'] = z_scores # Identify students needing remediation remediation_df = merged_df[merged_df['z_score'] < z_threshold] # Group by learning indicator to find students needing remediation li_remediation_groups = remediation_df.groupby(['learning_indicator_id', 'learning_indicator_text'])['student_id'].apply(lambda x: ', '.join(x.unique())).reset_index() # Identify students who don't need remediation students_needing_remediation = remediation_df['student_id'].unique() students_no_remediation = merged_df[~merged_df['student_id'].isin(students_needing_remediation)]['student_id'].unique() no_remediation_df = pd.DataFrame(students_no_remediation, columns=['student_id']) return li_remediation_groups, no_remediation_df def main(): col_logo , col_name = st.columns([1,3]) with col_logo: st.image("Screenshot 2024-08-07 at 1.05.24 PM.png") with col_name: st.title("Learning Indicator Analysis") st.subheader("Student and Class Remediation based on Question Data and Student Data. Upload Files") # Upload the dataframes col_A,colB = st.columns(2) with col_A: question_data_file = st.file_uploader("Upload Question Data CSV", type="csv") with colB: student_performance_data_file = st.file_uploader("Upload Student Performance Data CSV", type="csv") st.write("----------------------") if question_data_file and student_performance_data_file: question_data = pd.read_csv(question_data_file) student_performance_data = pd.read_csv(student_performance_data_file) # Analyze performance and prioritize LIs category_counts = analyze_student_performance_by_li(question_data, student_performance_data) prioritized_lis = prioritize_lis(category_counts) # Merge with original question data to get the learning indicator text prioritized_lis = pd.merge(prioritized_lis, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id', how='left') # Display the results with filters st.write("Learning Indicator Analysis with Priority") def highlight_priority(row): if row.Priority == 'High': return ['background-color: red']*len(row) elif row.Priority == 'Medium': return ['background-color: yellow']*len(row) elif row.Priority == 'Low': return ['background-color: green']*len(row) else: return ['']*len(row) col1,col2 = st.columns(2) with col1: st.dataframe(prioritized_lis.style.apply(highlight_priority, axis=1)) overall_li_level = mean_li_level_analysis(student_performance_data, question_data) overall_li_level = overall_li_level.sort_values("normalised_score") st.dataframe(overall_li_level) with col2: plt_data=prepare_data_for_ridge_plot(student_performance_data, question_data) plt_fig = create_ridge_plot(plt_data) st.pyplot(plt_fig) st.write("---------------------------") col3,col4 = st.columns(2) li_remediation_groups, no_remediation_df = remediation_groups(student_performance_data,question_data) with col3: st.write("Student Group Remediation based on LI") st.dataframe(li_remediation_groups) with col4: st.write("Students That are not part of group remediation") st.dataframe(no_remediation_df) # Filters for LI ID and Priority li_id_filter = st.multiselect("Exclude Li_ids :", prioritized_lis['learning_indicator_id'].unique()) priority_filter = st.multiselect("Exclude Priority:",prioritized_lis["Priority"].unique()) if li_id_filter: prioritized_lis = prioritized_lis[~prioritized_lis["learning_indicator_id"].isin(li_id_filter)] question_data = question_data[~question_data["learning_indicator_id"].isin(li_id_filter)] if priority_filter: li_ids_out = prioritized_lis[prioritized_lis["Priority"].isin(priority_filter)]["learning_indicator_id"].unique().tolist() question_data = question_data[~question_data["learning_indicator_id"].isin(li_ids_out)] # Button to generate student-level ranking if st.button("Generate Student Level Ranking"): print(len(question_data),"==question") print(len(prioritized_lis),"===priotisex") student_ranking = student_level_analysis(student_performance_data, question_data, prioritized_lis) st.write("Student Level Learning Indicator Ranking") st.dataframe(student_ranking) if __name__ == "__main__": main()