LeadSchoolRemediation / li_analysys.py
Nitesh_Kumar
test my dev
57a6cc3
raw
history blame
14 kB
import streamlit as st
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import zscore
st.set_page_config(layout="wide")
def categorize_marks(normalized_marks):
if normalized_marks >= 0.8:
return '80-100%'
elif normalized_marks >= 0.6:
return '60-80%'
else:
return '<60%'
def analyze_student_performance_by_li(question_data, student_performance_data):
# Merge the dataframes on question number
merged_data = pd.merge(student_performance_data, question_data, on='question_number')
merged_data = merged_data.groupby(["student_id","learning_indicator_id"])[["marks_obtained","maximum_marks"]].sum().reset_index()
merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks']
# Categorize the normalized marks
merged_data['category'] = merged_data['normalized_marks'].apply(categorize_marks)
# Group by learning indicator ID and category, and count the number of students in each category
merged_data = merged_data.groupby(['learning_indicator_id', 'category']).size().unstack(fill_value=0)
# Rename the columns for better readability
merged_data.columns = ['<60%', '60-80%', '80-100%']
merged_data = merged_data.sort_values(['<60%', '60-80%', '80-100%'],ascending=[False,False,False]).reset_index()
# Display the results
return merged_data
def prioritize_lis(category_counts):
# Add a rank column based on the order of rows
category_counts['Rank'] = category_counts.index + 1
# Determine the number of LIs
total_lis = len(category_counts)
# Determine the cutoff points for high, medium, and low priority
high_priority_cutoff = int(total_lis * 0.3)
medium_priority_cutoff = int(total_lis * 0.6)
# Classify the LIs based on their rank
category_counts['Priority'] = 'Low'
category_counts.loc[category_counts['Rank'] <= high_priority_cutoff, 'Priority'] = 'High'
category_counts.loc[(category_counts['Rank'] > high_priority_cutoff) & (category_counts['Rank'] <= medium_priority_cutoff), 'Priority'] = 'Medium'
return category_counts
def mean_li_level_analysis(student_data, question_data):
merged_data = pd.merge(student_data, question_data, on='question_number',how="inner")
# Normalize the marks obtained for each learning indicator by each student
merged_data = merged_data.groupby(['learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index()
merged_data["normalised_score"]=merged_data["marks_obtained"]/merged_data["maximum_marks"]
merged_data = pd.merge(merged_data,question_data[["learning_indicator_id","learning_indicator_text"]].drop_duplicates(),on="learning_indicator_id")
return merged_data
def student_level_analysis(student_data, question_data, prioritized_lis):
# Merge the student data with question data
merged_data = pd.merge(student_data, question_data, on='question_number',how="inner")
# Normalize the marks obtained for each learning indicator by each student
merged_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index()
merged_data['normalized_marks'] = merged_data['marks_obtained'] / merged_data['maximum_marks']
merged_data = merged_data[merged_data["normalized_marks"]<0.80]
# Merge with prioritized_lis to get the priority and rank
merged_data = pd.merge(merged_data, prioritized_lis[['learning_indicator_id', 'Rank']], on='learning_indicator_id', how='left')
# Rank the LIs for each student based on normalized marks and class-level LI priority
merged_data['student_rank'] = merged_data.groupby('student_id')['normalized_marks'].rank(method='dense', ascending=False)
merged_data = merged_data.sort_values(by=['student_id', 'student_rank', 'Rank'])
# Ensure unique ranks by adding a secondary sort by Rank
merged_data['unique_rank'] = merged_data.groupby('student_id').cumcount() + 1
# Create the final dataframe
student_ranking = merged_data.pivot(index='student_id', columns='unique_rank', values='learning_indicator_id').reset_index()
student_ranking.columns = ['student_id'] + [f'P{I+1}_li' for I in range(student_ranking.shape[1] - 1)]
li_text_mapping = question_data.drop_duplicates(subset='learning_indicator_id').set_index('learning_indicator_id')['learning_indicator_text']
for col in student_ranking.columns[1:]:
student_ranking[col] = student_ranking[col].map(li_text_mapping)
return student_ranking
def prepare_data_for_ridge_plot(student_data, question_data):
# Merge the DataFrames
merged_data = pd.merge(student_data, question_data, on='question_number', how='inner')
# Normalize the marks obtained for each learning indicator by each student
normalized_data = merged_data.groupby(['student_id', 'learning_indicator_id'])[['marks_obtained', 'maximum_marks']].sum().reset_index()
normalized_data['normalized_marks'] = normalized_data['marks_obtained'] / normalized_data['maximum_marks']
# Add learning_indicator_text to normalized_data
plot_data = pd.merge(normalized_data, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id')
return plot_data
def calculate_logical_quantiles(data, num_quantiles=5):
"""
Calculate logical quantiles for a given data set to ensure they are informative.
Parameters:
data (array-like): The input data for which to calculate quantiles.
num_quantiles (int): The number of quantiles to calculate. Default is 5.
Returns:
list: A list of quantile values.
"""
# Ensure there are enough unique values to calculate the quantiles
if len(np.unique(data)) < num_quantiles:
# If not enough unique values, use unique values as quantiles
quantiles = np.unique(data)
else:
# Calculate evenly spaced quantiles
quantiles = np.percentile(data, np.linspace(0, 100, num_quantiles))
return quantiles.tolist()
def create_ridge_plot(plot_data):
unique_learning_indicators = plot_data['learning_indicator_text'].unique()
n_indicators = len(unique_learning_indicators)
bandwidth = 0.5 # Adjust bandwidth for smoother graphs
darkgreen = '#9BC184'
midgreen = '#C2D6A4'
lightgreen = '#E7E5CB'
colors = [lightgreen, midgreen, darkgreen, midgreen, lightgreen]
fig, axs = plt.subplots(nrows=n_indicators, ncols=1, figsize=(10, n_indicators * 1.5), sharex=True)
axs = axs.flatten() # Flatten in case of single plot
for i, indicator in enumerate(unique_learning_indicators):
# Subset the data for each learning indicator
subset = plot_data[plot_data['learning_indicator_text'] == indicator]
# Plot the distribution of normalized marks
sns.kdeplot(
subset['normalized_marks'],
shade=True,
bw_adjust=bandwidth,
ax=axs[i],
color=sns.color_palette('coolwarm', n_colors=n_indicators)[i]
)
quantiles = calculate_logical_quantiles(subset["normalized_marks"].tolist())
# fill space between each pair of quantiles
for j in range(len(quantiles) - 1):
axs[i].fill_between(
[quantiles[j], # lower bound
quantiles[j+1]], # upper bound
0.1, # max y=0
0.3, # max y=0.0002
color=colors[j]
)
mean = subset['marks_obtained'].sum()/subset['maximum_marks'].sum()
axs[i].scatter([mean], [0.3], color='black', s=15)
global_mean = plot_data['normalized_marks'].mean()
axs[i].axvline(global_mean, color='#525252', linestyle='--')
axs[i].set_xlim(0, 1)
axs[i].set_ylim(0,3)
# Add the learning indicator text as the title
axs[i].set_title(indicator, loc='left', fontsize=12, fontweight='bold')
# Remove y-axis label
axs[i].set_ylabel('')
# Add a horizontal line for the baseline
axs[i].axhline(0, color='black', linewidth=1.3, linestyle='-')
# Set common labels
plt.xlabel('Normalized Marks', fontsize=12, fontweight='bold')
plt.tight_layout()
return fig
def remediation_groups(student_df, question_df, z_threshold=-1.35):
# Merge student performance with question data to get full marks
student_df["student_id"]=student_df["student_id"].astype(str)
merged_df = pd.merge(student_df, question_df, on='question_number')
# Apply minimum marks validation and fill NaN with 0
merged_df['marks_obtained'] = np.minimum(merged_df['marks_obtained'].fillna(0), merged_df['full_marks'])
# Calculate normalized scores
merged_df['normalized_score'] = merged_df['marks_obtained'] / merged_df['full_marks']
# Calculate z-scores for each learning indicator
z_scores = merged_df.groupby('learning_indicator_id')['normalized_score'].transform(zscore)
merged_df['z_score'] = z_scores
# Identify students needing remediation
remediation_df = merged_df[merged_df['z_score'] < z_threshold]
# Group by learning indicator to find students needing remediation
li_remediation_groups = remediation_df.groupby(['learning_indicator_id', 'learning_indicator_text'])['student_id'].apply(lambda x: ', '.join(x.unique())).reset_index()
# Identify students who don't need remediation
students_needing_remediation = remediation_df['student_id'].unique()
students_no_remediation = merged_df[~merged_df['student_id'].isin(students_needing_remediation)]['student_id'].unique()
no_remediation_df = pd.DataFrame(students_no_remediation, columns=['student_id'])
return li_remediation_groups, no_remediation_df
def main():
col_logo , col_name = st.columns([1,3])
with col_logo:
st.image("Screenshot 2024-08-07 at 1.05.24 PM.png")
with col_name:
st.title("Learning Indicator Analysis")
st.subheader("Student and Class Remediation based on Question Data and Student Data. Upload Files")
# Upload the dataframes
col_A,colB = st.columns(2)
with col_A:
question_data_file = st.file_uploader("Upload Question Data CSV", type="csv")
with colB:
student_performance_data_file = st.file_uploader("Upload Student Performance Data CSV", type="csv")
st.write("----------------------")
if question_data_file and student_performance_data_file:
question_data = pd.read_csv(question_data_file)
student_performance_data = pd.read_csv(student_performance_data_file)
# Analyze performance and prioritize LIs
category_counts = analyze_student_performance_by_li(question_data, student_performance_data)
prioritized_lis = prioritize_lis(category_counts)
# Merge with original question data to get the learning indicator text
prioritized_lis = pd.merge(prioritized_lis, question_data[['learning_indicator_id', 'learning_indicator_text']].drop_duplicates(), on='learning_indicator_id', how='left')
# Display the results with filters
st.write("Learning Indicator Analysis with Priority")
def highlight_priority(row):
if row.Priority == 'High':
return ['background-color: red']*len(row)
elif row.Priority == 'Medium':
return ['background-color: yellow']*len(row)
elif row.Priority == 'Low':
return ['background-color: green']*len(row)
else:
return ['']*len(row)
col1,col2 = st.columns(2)
with col1:
st.dataframe(prioritized_lis.style.apply(highlight_priority, axis=1))
overall_li_level = mean_li_level_analysis(student_performance_data, question_data)
overall_li_level = overall_li_level.sort_values("normalised_score")
st.dataframe(overall_li_level)
with col2:
plt_data=prepare_data_for_ridge_plot(student_performance_data, question_data)
plt_fig = create_ridge_plot(plt_data)
st.pyplot(plt_fig)
st.write("---------------------------")
col3,col4 = st.columns(2)
li_remediation_groups, no_remediation_df = remediation_groups(student_performance_data,question_data)
with col3:
st.write("Student Group Remediation based on LI")
st.dataframe(li_remediation_groups)
with col4:
st.write("Students That are not part of group remediation")
st.dataframe(no_remediation_df)
# Filters for LI ID and Priority
li_id_filter = st.multiselect("Exclude Li_ids :", prioritized_lis['learning_indicator_id'].unique())
priority_filter = st.multiselect("Exclude Priority:",prioritized_lis["Priority"].unique())
if li_id_filter:
prioritized_lis = prioritized_lis[~prioritized_lis["learning_indicator_id"].isin(li_id_filter)]
question_data = question_data[~question_data["learning_indicator_id"].isin(li_id_filter)]
if priority_filter:
li_ids_out = prioritized_lis[prioritized_lis["Priority"].isin(priority_filter)]["learning_indicator_id"].unique().tolist()
question_data = question_data[~question_data["learning_indicator_id"].isin(li_ids_out)]
# Button to generate student-level ranking
if st.button("Generate Student Level Ranking"):
print(len(question_data),"==question")
print(len(prioritized_lis),"===priotisex")
student_ranking = student_level_analysis(student_performance_data, question_data, prioritized_lis)
st.write("Student Level Learning Indicator Ranking")
st.dataframe(student_ranking)
if __name__ == "__main__":
main()