# # Import necessary libraries | |
# from fastapi import FastAPI, HTTPException | |
# from pydantic import BaseModel | |
# import gspread | |
# from google.oauth2.service_account import Credentials | |
# import pandas as pd | |
# from collections import defaultdict | |
# import os | |
# # Initialize the FastAPI app | |
# app = FastAPI() | |
# # Step 1: Define a function to get Google Sheets API credentials | |
# def get_credentials(): | |
# """Get Google Sheets API credentials from environment variables.""" | |
# try: | |
# # Construct the service account info dictionary | |
# service_account_info = { | |
# "type": os.getenv("SERVICE_ACCOUNT_TYPE"), | |
# "project_id": os.getenv("PROJECT_ID"), | |
# "private_key_id": os.getenv("PRIVATE_KEY_ID"), | |
# "private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), | |
# "client_email": os.getenv("CLIENT_EMAIL"), | |
# "client_id": os.getenv("CLIENT_ID"), | |
# "auth_uri": os.getenv("AUTH_URI"), | |
# "token_uri": os.getenv("TOKEN_URI"), | |
# "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), | |
# "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), | |
# "universe_domain": os.getenv("UNIVERSE_DOMAIN") | |
# } | |
# scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] | |
# creds = Credentials.from_service_account_info(service_account_info, scopes=scope) | |
# return creds | |
# except Exception as e: | |
# print(f"Error getting credentials: {e}") | |
# return None | |
# # Step 2: Authorize gspread using the credentials | |
# creds = get_credentials() | |
# client = gspread.authorize(creds) | |
# # Input the paths and coaching code | |
# journal_file_path = '' | |
# panic_button_file_path = '' | |
# test_file_path = '' | |
# coachingCode = '1919' | |
# if coachingCode == '1919': | |
# journal_file_path = 'https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?usp=drive_link' | |
# panic_button_file_path = 'https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?usp=drive_link' | |
# test_file_path = 'https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?usp=drive_link' | |
# # Step 3: Open Google Sheets using the URLs | |
# journal_file = client.open_by_url(journal_file_path).worksheet('Sheet1') | |
# panic_button_file = client.open_by_url(panic_button_file_path).worksheet('Sheet1') # Fixed missing part | |
# test_file = client.open_by_url(test_file_path).worksheet('Sheet1') | |
# # Step 4: Convert the sheets into Pandas DataFrames | |
# journal_df = pd.DataFrame(journal_file.get_all_values()) | |
# panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) | |
# test_df = pd.DataFrame(test_file.get_all_values()) | |
# # Label the columns manually since there are no headers | |
# journal_df.columns = ['user_id', 'productivity_yes_no', 'productivity_rate'] | |
# panic_button_df.columns = ['user_id', 'panic_button'] | |
# # Initialize a list for the merged data | |
# merged_data = [] | |
# # Step 5: Group panic buttons by user_id and combine into a single comma-separated string | |
# panic_button_grouped = panic_button_df.groupby('user_id')['panic_button'].apply(lambda x: ','.join(x)).reset_index() | |
# # Merge journal and panic button data | |
# merged_journal_panic = pd.merge(journal_df, panic_button_grouped, on='user_id', how='outer') | |
# # Step 6: Process the test data | |
# test_data = [] | |
# for index, row in test_df.iterrows(): | |
# user_id = row[0] | |
# i = 1 | |
# while i < len(row) and pd.notna(row[i]): # Process chapter and score pairs | |
# chapter = row[i].lower().strip() | |
# score = row[i + 1] | |
# if pd.notna(score): | |
# test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) | |
# i += 2 | |
# # Convert the processed test data into a DataFrame | |
# test_df_processed = pd.DataFrame(test_data) | |
# # Step 7: Merge the journal+panic button data with the test data | |
# merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') | |
# # Step 8: Drop rows where all data (except user_id and test_chapter) is missing | |
# merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') | |
# # Group the merged DataFrame by user_id | |
# df = pd.DataFrame(merged_data_cleaned) | |
# # Function to process panic button counts and test scores | |
# def process_group(group): | |
# # Panic button counts | |
# panic_button_series = group['panic_button'].dropna() | |
# panic_button_dict = panic_button_series.value_counts().to_dict() | |
# # Test scores aggregation | |
# test_scores = group[['test_chapter', 'test_score']].dropna() | |
# test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') | |
# # Create the test_scores_dict excluding NaN values | |
# test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() | |
# return pd.Series({ | |
# 'productivity_yes_no': group['productivity_yes_no'].iloc[0], | |
# 'productivity_rate': group['productivity_rate'].iloc[0], | |
# 'panic_button': panic_button_dict, | |
# 'test_scores': test_scores_dict | |
# }) | |
# # Apply the group processing function | |
# merged_df = df.groupby('user_id').apply(process_group).reset_index() | |
# # Step 9: Calculate potential score | |
# # Panic button weightages | |
# academic_weights = {'BACKLOGS': -5, 'MISSED CLASSES': -4, 'NOT UNDERSTANDING': -3, 'BAD MARKS': -3, 'LACK OF MOTIVATION': -3} | |
# non_academic_weights = {'EMOTIONAL FACTORS': -3, 'PROCRASTINATE': -2, 'LOST INTEREST': -4, 'LACK OF FOCUS': -2, 'GOALS NOT ACHIEVED': -2, 'LACK OF DISCIPLINE': -2} | |
# # Max weighted panic score | |
# max_weighted_panic_score = sum([max(academic_weights.values()) * 3, max(non_academic_weights.values()) * 3]) | |
# # Function to calculate potential score | |
# def calculate_potential_score(row): | |
# # Test score normalization (70% weightage) | |
# if row['test_scores']: # Check if test_scores is not empty | |
# avg_test_score = sum(row['test_scores'].values()) / len(row['test_scores']) | |
# test_score_normalized = (avg_test_score / 40) * 70 # Scale test score to 70 | |
# else: | |
# test_score_normalized = 0 # Default value for users with no test scores | |
# # Panic score calculation (20% weightage) | |
# student_panic_score = 0 | |
# if row['panic_button']: # Ensure panic_button is not NaN or empty | |
# for factor, count in row['panic_button'].items(): | |
# if factor in academic_weights: | |
# student_panic_score += academic_weights[factor] * count | |
# elif factor in non_academic_weights: | |
# student_panic_score += non_academic_weights[factor] * count | |
# else: | |
# student_panic_score = 0 # Default if no panic button issues | |
# # Panic score normalized to 20 | |
# panic_score = 20 * (1 - (student_panic_score / max_weighted_panic_score) if max_weighted_panic_score != 0 else 1) | |
# # Journal score calculation (10% weightage) | |
# if pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'Yes': | |
# if pd.notna(row['productivity_rate']): | |
# journal_score = (float(row['productivity_rate']) / 10) * 10 # Scale journal score to 10 | |
# else: | |
# journal_score = 0 # Default if productivity_rate is missing | |
# elif pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'No': | |
# if pd.notna(row['productivity_rate']): | |
# journal_score = (float(row['productivity_rate']) / 10) * 5 # Scale journal score to 5 if "No" | |
# else: | |
# journal_score = 0 # Default if productivity_rate is missing | |
# else: | |
# journal_score = 0 # Default if productivity_yes_no is missing | |
# # Total score based on new weightages | |
# total_potential_score = test_score_normalized + panic_score + journal_score | |
# return total_potential_score | |
# # Apply potential score calculation to the dataframe | |
# merged_df['potential_score'] = merged_df.apply(calculate_potential_score, axis=1) | |
# merged_df['potential_score'] = merged_df['potential_score'].round(2) | |
# # Step 10: Sort by potential score | |
# sorted_df = merged_df[['user_id', 'potential_score']].sort_values(by='potential_score', ascending=False) | |
# # Step 11: Define API endpoint to get the sorted potential scores | |
# @app.get("/sorted-potential-scores") | |
# async def get_sorted_potential_scores(): | |
# try: | |
# result = sorted_df.to_dict(orient="records") | |
# return {"sorted_scores": result} | |
# except Exception as e: | |
# raise HTTPException(status_code=500, detail=str(e)) | |
# Import necessary libraries | |
# from fastapi import FastAPI, HTTPException, Query | |
# from pydantic import BaseModel | |
# import gspread | |
# from google.oauth2.service_account import Credentials | |
# import pandas as pd | |
# from collections import defaultdict | |
# import os | |
# from fastapi.middleware.cors import CORSMiddleware | |
# # Initialize the FastAPI app | |
# app = FastAPI() | |
# app.add_middleware( | |
# CORSMiddleware, | |
# allow_origins=["*"], # You can specify domains instead of "*" to restrict access | |
# allow_credentials=True, | |
# allow_methods=["*"], # Allows all HTTP methods (POST, GET, OPTIONS, etc.) | |
# allow_headers=["*"], # Allows all headers | |
# ) | |
# # Step 1: Define a function to get Google Sheets API credentials | |
# def get_credentials(): | |
# """Get Google Sheets API credentials from environment variables.""" | |
# try: | |
# # Construct the service account info dictionary | |
# service_account_info = { | |
# "type": os.getenv("SERVICE_ACCOUNT_TYPE"), | |
# "project_id": os.getenv("PROJECT_ID"), | |
# "private_key_id": os.getenv("PRIVATE_KEY_ID"), | |
# "private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), | |
# "client_email": os.getenv("CLIENT_EMAIL"), | |
# "client_id": os.getenv("CLIENT_ID"), | |
# "auth_uri": os.getenv("AUTH_URI"), | |
# "token_uri": os.getenv("TOKEN_URI"), | |
# "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), | |
# "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), | |
# "universe_domain": os.getenv("UNIVERSE_DOMAIN") | |
# } | |
# scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] | |
# creds = Credentials.from_service_account_info(service_account_info, scopes=scope) | |
# return creds | |
# except Exception as e: | |
# print(f"Error getting credentials: {e}") | |
# return None | |
# # Step 2: Authorize gspread using the credentials | |
# creds = get_credentials() | |
# client = gspread.authorize(creds) | |
# # Function to get file paths based on coaching code | |
# def get_file_paths(coaching_code): | |
# if coaching_code == '1919': | |
# return { | |
# 'journal': 'https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?usp=drive_link', | |
# 'panic_button': 'https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?usp=drive_link', | |
# 'test': 'https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?usp=drive_link' | |
# } | |
# if coaching_code == '0946': | |
# return { | |
# 'journal': 'https://docs.google.com/spreadsheets/d/1c1TkL7sOUvFn6UPz3gwp135UVjOou9u1weohWzpmx6I/edit?usp=drive_link', | |
# 'panic_button': 'https://docs.google.com/spreadsheets/d/1RhbPQnNNBUthKKJyoW4q6x3uaWl1YSqmsFlfJ2THphE/edit?usp=drive_link', | |
# 'test': 'https://docs.google.com/spreadsheets/d/1JO5wDkfl2fr2ZQenI8OEu48jkWm48veYN1Fsw5Ctkzw/edit?usp=drive_link' | |
# } | |
# # Panic button weightages | |
# academic_weights = {'BACKLOGS': -5, 'MISSED CLASSES': -4, 'NOT UNDERSTANDING': -3, 'BAD MARKS': -3, 'LACK OF MOTIVATION': -3} | |
# non_academic_weights = {'EMOTIONAL FACTORS': -3, 'PROCRASTINATE': -2, 'LOST INTEREST': -4, 'LACK OF FOCUS': -2, 'GOALS NOT ACHIEVED': -2, 'LACK OF DISCIPLINE': -2} | |
# # Max weighted panic score | |
# max_weighted_panic_score = sum([max(academic_weights.values()) * 3, max(non_academic_weights.values()) * 3]) | |
# # Function to calculate potential score | |
# def calculate_potential_score(row): | |
# # Test score normalization (70% weightage) | |
# if row['test_scores']: # Check if test_scores is not empty | |
# avg_test_score = sum(row['test_scores'].values()) / len(row['test_scores']) | |
# test_score_normalized = (avg_test_score / 40) * 70 # Scale test score to 70 | |
# else: | |
# test_score_normalized = 0 # Default value for users with no test scores | |
# # Panic score calculation (20% weightage) | |
# student_panic_score = 0 | |
# if row['panic_button']: # Ensure panic_button is not NaN or empty | |
# for factor, count in row['panic_button'].items(): | |
# if factor in academic_weights: | |
# student_panic_score += academic_weights[factor] * count | |
# elif factor in non_academic_weights: | |
# student_panic_score += non_academic_weights[factor] * count | |
# else: | |
# student_panic_score = 0 # Default if no panic button issues | |
# # Panic score normalized to 20 | |
# panic_score = 20 * (1 - (student_panic_score / max_weighted_panic_score) if max_weighted_panic_score != 0 else 1) | |
# # Journal score calculation (10% weightage) | |
# if pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'Yes': | |
# if pd.notna(row['productivity_rate']): | |
# journal_score = (float(row['productivity_rate']) / 10) * 10 # Scale journal score to 10 | |
# else: | |
# journal_score = 0 # Default if productivity_rate is missing | |
# elif pd.notna(row['productivity_yes_no']) and row['productivity_yes_no'] == 'No': | |
# if pd.notna(row['productivity_rate']): | |
# journal_score = (float(row['productivity_rate']) / 10) * 5 # Scale journal score to 5 if "No" | |
# else: | |
# journal_score = 0 # Default if productivity_rate is missing | |
# else: | |
# journal_score = 0 # Default if productivity_yes_no is missing | |
# # Total score based on new weightages | |
# total_potential_score = test_score_normalized + panic_score + journal_score | |
# return total_potential_score | |
# # Step 11: Define API endpoint to get the sorted potential scores | |
# @app.get("/sorted-potential-scores") | |
# async def get_sorted_potential_scores(coaching_code: str = Query(..., description="Coaching code to determine file paths")): | |
# try: | |
# file_paths = get_file_paths(coaching_code) | |
# if not file_paths: | |
# raise HTTPException(status_code=400, detail="Invalid coaching code") | |
# print("A"); | |
# # Open Google Sheets using the URLs | |
# journal_file = client.open_by_url(file_paths['journal']).worksheet('Sheet1') | |
# panic_button_file = client.open_by_url(file_paths['panic_button']).worksheet('Sheet1') | |
# test_file = client.open_by_url(file_paths['test']).worksheet('Sheet1') | |
# print("B"); | |
# # Convert the sheets into Pandas DataFrames | |
# journal_df = pd.DataFrame(journal_file.get_all_values()) | |
# panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) | |
# test_df = pd.DataFrame(test_file.get_all_values()) | |
# print("C"); | |
# # Label the columns manually since there are no headers | |
# journal_df.columns = ['user_id', 'productivity_yes_no', 'productivity_rate'] | |
# panic_button_df.columns = ['user_id', 'panic_button'] | |
# print("D") | |
# # Initialize a list for the merged data | |
# merged_data = [] | |
# # Group panic buttons by user_id and combine into a single comma-separated string | |
# panic_button_grouped = panic_button_df.groupby('user_id')['panic_button'].apply(lambda x: ','.join(x)).reset_index() | |
# print("E") | |
# # Merge journal and panic button data | |
# merged_journal_panic = pd.merge(journal_df, panic_button_grouped, on='user_id', how='outer') | |
# print("F") | |
# # Process the test data | |
# test_data = [] | |
# for index, row in test_df.iterrows(): | |
# user_id = row[0] | |
# i = 1 | |
# while i < len(row) and pd.notna(row[i]): # Process chapter and score pairs | |
# chapter = row[i].lower().strip() | |
# score = row[i + 1] | |
# if pd.notna(score): | |
# test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) | |
# i += 2 | |
# print("G") | |
# # Convert the processed test data into a DataFrame | |
# test_df_processed = pd.DataFrame(test_data) | |
# print("H") | |
# # Merge the journal+panic button data with the test data | |
# merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') | |
# print("I") | |
# # Drop rows where all data (except user_id and test_chapter) is missing | |
# merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') | |
# print("J") | |
# # Group the merged DataFrame by user_id | |
# df = pd.DataFrame(merged_data_cleaned) | |
# print("K") | |
# # Function to process panic button counts and test scores | |
# def process_group(group): | |
# # Panic button counts | |
# panic_button_series = group['panic_button'].dropna() | |
# panic_button_dict = panic_button_series.value_counts().to_dict() | |
# # Test scores aggregation | |
# test_scores = group[['test_chapter', 'test_score']].dropna() | |
# test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') | |
# # Create the test_scores_dict excluding NaN values | |
# test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() | |
# return pd.Series({ | |
# 'productivity_yes_no': group['productivity_yes_no'].iloc[0], | |
# 'productivity_rate': group['productivity_rate'].iloc[0], | |
# 'panic_button': panic_button_dict, | |
# 'test_scores': test_scores_dict | |
# }) | |
# # Apply the group processing function | |
# merged_df = df.groupby('user_id').apply(process_group).reset_index() | |
# print("L") | |
# # Calculate potential scores and sort | |
# merged_df['potential_score'] = merged_df.apply(calculate_potential_score, axis=1) | |
# merged_df['potential_score'] = merged_df['potential_score'].round(2) | |
# sorted_df = merged_df[['user_id', 'potential_score']].sort_values(by='potential_score', ascending=False) | |
# print("M") | |
# result = sorted_df.to_dict(orient="records") | |
# return {"sorted_scores": result} | |
# except Exception as e: | |
# raise HTTPException(status_code=500, detail=str(e)) | |
from fastapi import FastAPI, HTTPException, Query | |
from pydantic import BaseModel | |
import gspread | |
from google.oauth2.service_account import Credentials | |
import pandas as pd | |
from collections import defaultdict | |
import os | |
from fastapi.middleware.cors import CORSMiddleware | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # You can specify domains instead of "*" to restrict access | |
allow_credentials=True, | |
allow_methods=["*"], # Allows all HTTP methods (POST, GET, OPTIONS, etc.) | |
allow_headers=["*"], # Allows all headers | |
) | |
# Model for request | |
class CoachingCodeRequest(BaseModel): | |
coachingCode: str | |
# Function to get credentials | |
def get_credentials(): | |
"""Get Google Sheets API credentials from environment variables.""" | |
try: | |
# Construct the service account info dictionary | |
service_account_info = { | |
"type": os.getenv("SERVICE_ACCOUNT_TYPE"), | |
"project_id": os.getenv("PROJECT_ID"), | |
"private_key_id": os.getenv("PRIVATE_KEY_ID"), | |
"private_key": os.getenv("PRIVATE_KEY").replace('\\n', '\n'), | |
"client_email": os.getenv("CLIENT_EMAIL"), | |
"client_id": os.getenv("CLIENT_ID"), | |
"auth_uri": os.getenv("AUTH_URI"), | |
"token_uri": os.getenv("TOKEN_URI"), | |
"auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"), | |
"client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"), | |
"universe_domain": os.getenv("UNIVERSE_DOMAIN") | |
} | |
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] | |
creds = Credentials.from_service_account_info(service_account_info, scopes=scope) | |
return creds | |
except Exception as e: | |
print(f"Error getting credentials: {e}") | |
return None | |
# Select files based on coaching code | |
def select_files(coaching_code): | |
creds = get_credentials() | |
client = gspread.authorize(creds) | |
if coaching_code == "1919": | |
journal_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1EFf2lr4A10nt4RhIqxCD_fxe-l3sXH09II0TEkMmvhA/edit?gid=0#gid=0').worksheet('Sheet1') | |
panic_button_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1nFZGkCvRV6qS-mhsORhX3dxI0JSge32_UwWgWKl3eyw/edit?gid=0#gid=0').worksheet('Sheet1') | |
test_file = client.open_by_url('https://docs.google.com/spreadsheets/d/13PUHySUXWtKBusjugoe7Dbsm39PwBUfG4tGLipspIx4/edit?gid=0#gid=0').worksheet('Sheet1') | |
elif coaching_code == "1099": | |
journal_file = client.open_by_url('https://docs.google.com/spreadsheets/d/12UQzr7xy70-MvbKUuqM6YMUF-y2kY1rumX0vOj0hKXI/edit?gid=0#gid=0').worksheet('Sheet1') | |
panic_button_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1zaKSRKgf2Nd7lWIf315YzvQeTQ3gU_PIRIS_bEAhl90/edit?gid=0#gid=0').worksheet('Sheet1') | |
test_file = client.open_by_url('https://docs.google.com/spreadsheets/d/1ms_SdloQqlXO85NK_xExhHT0LEeLsth0VBmdHQt55jc/edit?gid=0#gid=0').worksheet('Sheet1') | |
else: | |
raise HTTPException(status_code=404, detail="Invalid coaching code") | |
return journal_file, panic_button_file, test_file | |
# Main route to get sorted scores | |
async def get_revison_chapters(data: CoachingCodeRequest): | |
journal_file, panic_button_file, test_file = select_files(data.coachingCode) | |
# Load data into DataFrames | |
journal_df = pd.DataFrame(journal_file.get_all_values()) | |
panic_button_df = pd.DataFrame(panic_button_file.get_all_values()) | |
test_df = pd.DataFrame(test_file.get_all_values()) | |
# Processing logic | |
panic_data = [] | |
for index, row in panic_button_df.iterrows(): | |
user_id = row[0] | |
row_pairs = row[1:].dropna().to_list()[-5:] | |
for i in range(0, len(row_pairs), 2): | |
panic = row_pairs[i].upper().strip() | |
if pd.notna(panic): | |
panic_data.append({'user_id': user_id, 'panic_button': panic}) | |
panic_df_processed = pd.DataFrame(panic_data) | |
test_data = [] | |
for index, row in test_df.iterrows(): | |
user_id = row[0] | |
row_pairs = row[1:].dropna().to_list() | |
chapter_scores = {} | |
for i in range(0, len(row_pairs), 2): | |
chapter = row_pairs[i].lower().strip() | |
score = row_pairs[i + 1] | |
if pd.notna(score): | |
if chapter not in chapter_scores: | |
chapter_scores[chapter] = [] | |
chapter_scores[chapter].append(score) | |
for chapter, scores in chapter_scores.items(): | |
last_5_scores = scores[-5:] | |
for score in last_5_scores: | |
test_data.append({'user_id': user_id, 'test_chapter': chapter, 'test_score': score}) | |
test_df_processed = pd.DataFrame(test_data) | |
journal_data = [] | |
for index, row in journal_df.iterrows(): | |
user_id = row[0] | |
row_pairs = row[1:].dropna().to_list()[-10:] | |
for i in range(0, len(row_pairs), 2): | |
productivity_yes_no = row_pairs[i].lower().strip() | |
productivity_rate = row_pairs[i + 1] | |
if pd.notna(productivity_rate): | |
journal_data.append({'user_id': user_id, 'productivity_yes_no': productivity_yes_no, 'productivity_rate': productivity_rate}) | |
journal_df_processed = pd.DataFrame(journal_data) | |
merged_journal_panic = pd.merge(panic_df_processed, journal_df_processed, on='user_id', how='outer') | |
merged_data = pd.merge(merged_journal_panic, test_df_processed, on='user_id', how='outer') | |
merged_data_cleaned = merged_data.dropna(subset=['productivity_yes_no', 'productivity_rate', 'panic_button', 'test_chapter'], how='all') | |
def process_group(group): | |
# Panic button counts | |
panic_button_series = group['panic_button'].dropna() | |
panic_button_dict = panic_button_series.value_counts().to_dict() | |
# Test scores aggregation | |
test_scores = group[['test_chapter', 'test_score']].dropna() | |
test_scores['test_score'] = pd.to_numeric(test_scores['test_score'], errors='coerce') | |
# Create the test_scores_dict excluding NaN values | |
test_scores_dict = test_scores.groupby('test_chapter')['test_score'].mean().dropna().to_dict() | |
return pd.Series({ | |
'productivity_yes_no': group['productivity_yes_no'].iloc[0], | |
'productivity_rate': group['productivity_rate'].iloc[0], | |
'panic_button': panic_button_dict, | |
'test_scores': test_scores_dict | |
}) | |
merged_df = merged_data_cleaned.groupby('user_id').apply(process_group).reset_index() | |
revision_dict = {} | |
# Define the score threshold for requiring revision | |
threshold = 16 | |
# Iterate through each student's test_scores | |
for index, row in merged_df.iterrows(): | |
user_id = row['user_id'] | |
test_scores = row['test_scores'] | |
# Check each chapter's score for this student | |
for chapter, score in test_scores.items(): | |
if score < threshold: | |
if chapter not in revision_dict: | |
revision_dict[chapter] = [] | |
revision_dict[chapter].append(user_id) | |
# Convert the dictionary to the desired DataFrame format | |
revision_df = pd.DataFrame({ | |
'chapter_name': list(revision_dict.keys()), | |
'user_ids_needing_revision': [', '.join(users) for users in revision_dict.values()] | |
}) | |
revision_json = revision_df.to_dict(orient="records") | |
return {"data": revision_json} | |