import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objs as go import streamlit as st import joblib import re def get_year(student_id): year_str = "" for char in student_id: if char.isdigit(): year_str += char if len(year_str) == 2: break return int(year_str) @st.cache_data() def process_data(raw_data): raw_data = raw_data[ ~raw_data["TenMH"].str.contains("IE|Intensive English|IE2|IE1|IE3|IE0") ] pivot_df = pd.pivot_table( raw_data, values="DiemHP", index="MaSV", columns="TenMH", aggfunc="first" ) pivot_df = pivot_df.reset_index().rename_axis(None, axis=1) pivot_df.columns.name = None pivot_df = pivot_df.dropna(thresh=50, axis=1) pivot_df = pivot_df.rename(columns=lambda x: x.strip()) df = pd.merge(pivot_df, raw_data[["MaSV"]], on="MaSV") df.drop_duplicates(subset="MaSV", keep="last", inplace=True) dfid = df["MaSV"] df.drop(["MaSV"], axis=1, inplace=True) df.replace(["WH", "VT", "I"], np.nan, inplace=True) df.iloc[:, :-1] = df.iloc[:, :-1].apply(pd.to_numeric) df = pd.merge(dfid, df, left_index=True, right_index=True) df["MaSV_school"] = df["MaSV"].str.slice(2, 4) df["Major"] = df["MaSV"].str.slice(0, 2) df["Year"] = 2000 + df["MaSV"].apply(get_year) df["Year"] = df["Year"].astype(str) df = pd.merge(df, raw_data[["MaSV", "DTBTK"]].drop_duplicates(), on="MaSV") df = df.drop(columns="MaSV") return df def process_data_per(raw_data): raw_data = raw_data[ ~raw_data["TenMH"].str.contains("IE|Intensive English|IE2|IE1|IE3|IE0") ] pivot_df = pd.pivot_table( raw_data, values="DiemHP", index="MaSV", columns="TenMH", aggfunc="first" ) pivot_df = pivot_df.reset_index().rename_axis(None, axis=1) pivot_df.columns.name = None pivot_df = pivot_df.dropna(thresh=50, axis=1) pivot_df = pivot_df.rename(columns=lambda x: x.strip()) pivot_df.replace(["WH", "VT", "I"], np.nan, inplace=True) pivot_df.iloc[:, 1:] = pivot_df.iloc[:, 1:].apply(pd.to_numeric) return pivot_df def process_predict_data(raw_data): dtk = raw_data[["MaSV", "DTBTKH4"]].copy() dtk.drop_duplicates(subset="MaSV", keep="last", inplace=True) count_duplicates = ( raw_data.groupby(["MaSV", "MaMH"]).size().reset_index(name="Times") ) courses = raw_data[ raw_data["MaMH"].str.startswith( ("IT", "BA", "BM", "BT", "MA", "CE", "EE", "EL", "ENEE", "IS", "MAFE", "PH") ) ] courses_list = courses["MaMH"].unique().tolist() count_duplicates["fail_courses_list"] = ( (count_duplicates["MaMH"].isin(courses_list)) & (count_duplicates["Times"] >= 2) ).astype(int) count_duplicates["fail_not_courses_list"] = ( (~count_duplicates["MaMH"].isin(courses_list)) & (count_duplicates["Times"] >= 2) ).astype(int) count_duplicates["pass_courses"] = ( (~count_duplicates["MaMH"].isin(courses_list)) & (count_duplicates["Times"] == 1) ).astype(int) fail = ( count_duplicates.groupby("MaSV")[["fail_courses_list", "fail_not_courses_list"]] .sum() .reset_index() ) fail.columns = ["MaSV", "fail_courses_list_count", "fail_not_courses_list_count"] df = pd.merge(dtk, fail, on="MaSV") df = df.rename(columns={"DTBTKH4": "GPA"}) data = raw_data[["MaSV", "NHHK", "SoTCDat"]] data = ( data.groupby(["MaSV"])["SoTCDat"].mean().reset_index(name="Mean_Cre").round(2) ) df = pd.merge(df, data, on="MaSV") df1 = raw_data[["MaSV", "MaMH", "NHHK"]] courses_list = raw_data[ (raw_data["MaMH"].str.startswith("EN")) & ~(raw_data["MaMH"].str.contains("EN007|EN008|EN011|EN012")) ].MaMH.tolist() filtered_df = df1[df1["MaMH"].isin(courses_list)] nhhk_counts = ( filtered_df.groupby("MaSV")["NHHK"].nunique().reset_index(name="EPeriod") ) df = pd.merge(df, nhhk_counts, on="MaSV", how="left").fillna(0) df = df[ [ "MaSV", "GPA", "Mean_Cre", "fail_courses_list_count", "fail_not_courses_list_count", "EPeriod", ] ] return df def predict_late_student(test_df): model = joblib.load("model/Time/Late.joblib") model1 = joblib.load("model/Time/Sem.joblib") test_dfed = process_predict_data(test_df) std_id = test_dfed.iloc[:, 0] test_dfed = test_dfed.drop(test_dfed.columns[0], axis=1) prediction = model.predict(test_dfed) prediction1 = model1.predict(test_dfed) test_dfed["Semeters"] = prediction1 test_dfed["Progress"] = ["late" if p == 1 else "not late" for p in prediction] test_dfed.insert(0, "MaSV", std_id) for index, row in test_dfed.iterrows(): if row["Semeters"] <= 9 and row["Progress"] == "late": test_dfed.loc[index, "Semeters"] = row["Semeters"] / 2 test_dfed.loc[index, "Progress"] = "may late" else: test_dfed.loc[index, "Semeters"] = row["Semeters"] / 2 return test_dfed def get_major(raw_data): major_mapping = { "BA": "BA", "BE": "BM", "BT": "BT", "CE": "CE", "EE": "EE", "EN": "EL", "EV": "ENEE", "IE": "IS", "IT": "IT", "MA": "MAFE", "SE": "PH", } for major, ma_mh in major_mapping.items(): if raw_data["MaSV"].str[:2].str.contains(major).any(): return major, ma_mh return None, None def create_pivot_table(raw_data): pivot_df = pd.pivot_table( raw_data, values="DiemHP", index="MaSV", columns="MaMH", aggfunc="first" ) pivot_df = pivot_df.reset_index().rename_axis(None, axis=1) pivot_df.columns.name = None return pivot_df def drop_nan_columns(pivot_df): pivot_df = pivot_df.rename(columns=lambda x: x.strip()) pivot_df.replace(["WH", "VT", "I", "P", "F"], np.nan, inplace=True) pivot_df.iloc[:, 1:] = pivot_df.iloc[:, 1:].apply(pd.to_numeric) return pivot_df def merge_with_xeploainh(pivot_df, raw_data): df = pd.merge(pivot_df, raw_data[["MaSV", "DTBTK"]], on="MaSV") df.drop_duplicates(subset="MaSV", keep="last", inplace=True) return df def fill_missing_values(df): col = df.drop(["MaSV", "DTBTK"], axis=1) columns_data = get_column_data(df) dup = pd.DataFrame(columns=columns_data) df = pd.merge(dup, df, on=col.columns.tolist(), how="outer") for col in df.columns: if df[col].isnull().values.any(): df[col].fillna(value=df["DTBTK"], inplace=True) return df def get_column_data(df): major = df["MaSV"].str[:2].unique()[0] column_file = f"Columns/column_{major}.txt" columns_data = [] with open(column_file, "r") as f: for line in f: columns_data.append(str(line.strip())) return columns_data def prepare_data(df): std_id = df["MaSV"].copy() df = df.drop(["MaSV", "DTBTK"], axis=1) df.sort_index(axis=1, inplace=True) return df def predict_rank(raw_data): major, ma_mh = get_major(raw_data) if major: raw_data["MaMH"] = raw_data["MaMH"].str[:-2] raw_data = raw_data[raw_data["MaMH"].str.startswith(ma_mh)] pivot_df = create_pivot_table(raw_data) pivot_df = drop_nan_columns(pivot_df) df = merge_with_xeploainh(pivot_df, raw_data) df = fill_missing_values(df) std_id = df["MaSV"].copy() df = prepare_data(df) model = joblib.load(f"model/{major}_rank.joblib") prediction = model.predict(df) new_columns = pd.concat( [pd.Series(std_id, name="MaSV"), pd.Series(prediction, name="Pred Rank")], axis=1, ) df = pd.concat([new_columns, df], axis=1) newframe = df.copy() df = newframe[["MaSV", "Pred Rank"]] return df else: return None def predict_one_student(raw_data, student_id): student = process_data_per(raw_data) filtered_df = student[student["MaSV"] == student_id] if len(filtered_df) > 0: selected_row = filtered_df.iloc[0, 1:].dropna() values = selected_row.values.tolist() course_data_filtered = [x for x in selected_row if not np.isnan(x)] counts, bins = np.histogram(course_data_filtered, bins=np.arange(0, 110, 10)) grade_bins = [f"{bins[i]}-{bins[i+1]}" for i in range(len(bins) - 1)] total_count = len(selected_row) frequencies_percentage = (counts / total_count) * 100 fig1 = go.Figure() fig1.add_trace( go.Scatter( x=bins[:-1], y=frequencies_percentage, mode="lines", name="Frequency" ) ) fig1.update_layout( title="Frequency Range for", xaxis_title="Score", yaxis_title="Percentage", height=400, width=400, ) data = raw_data[["MaSV", "NHHK", "TenMH", "DiemHP"]] data["TenMH"] = data["TenMH"].str.lstrip() data["NHHK"] = data["NHHK"].apply(lambda x: str(x)[:4] + " S " + str(x)[4:]) rows_to_drop = [] with open("rows_to_drop.txt", "r") as f: for line in f: rows_to_drop.append(str(line.strip())) data = data[~data["TenMH"].isin(rows_to_drop)] student_data = data[data["MaSV"] == student_id][["NHHK", "TenMH", "DiemHP"]] student_data["DiemHP"] = pd.to_numeric(student_data["DiemHP"], errors="coerce") fig2 = px.bar( student_data, x="TenMH", y="DiemHP", color="NHHK", title="Student Score vs. Course", ) fig2.update_layout( title="Student Score vs. Course", xaxis_title=None, yaxis_title="Score", ) fig2.add_shape( type="line", x0=0, y0=50, x1=len(student_data["TenMH"]) - 1, y1=50, line=dict(color="red", width=3), ) col1, col2 = st.columns(2) with col1: st.plotly_chart(fig1, use_container_width=True) with col2: st.plotly_chart(fig2, use_container_width=True) else: st.write("No data found for student {}".format(student_id)) def show_boxplot1( new1_df, new1_dfa, major, school, year, additional_selection="", year_a="" ): if additional_selection != " ": show_boxplot = st.checkbox( "Show Boxplot for student's performance", key="checkbox2" ) if show_boxplot: fig = px.box(new1_df) fig1 = px.box(new1_dfa) fig.update_layout( title="Boxplot of " + major + school + " student in " + year ) fig1.update_layout( title="Boxplot of " + major + additional_selection + " student in " + year_a ) col1, col2 = st.columns(2) with col1: st.plotly_chart(fig, use_container_width=True) with col2: st.plotly_chart(fig1, use_container_width=True) elif additional_selection == " " and year_a != " ": show_boxplot = st.checkbox( "Show Boxplot for student's performance", key="checkbox2" ) if show_boxplot: fig = px.box(new1_df) fig1 = px.box(new1_dfa) fig.update_layout( title="Boxplot of " + major + school + " student in " + year ) fig1.update_layout( title="Boxplot of " + major + school + " student in " + year_a ) col1, col2 = st.columns(2) with col1: st.plotly_chart(fig, use_container_width=True) with col2: st.plotly_chart(fig1, use_container_width=True) elif additional_selection == " ": show_boxplot = st.checkbox( "Show Boxplot for student's performance", key="checkbox2" ) if show_boxplot: fig = px.box(new1_df) fig.update_layout(title="Boxplot of " + major + " student in " + year) st.plotly_chart(fig, use_container_width=True)