# -*- coding: utf-8 -*- """AIE3final.py ______ Automated Grading System for AIE3 Final Project ______ """ # Import necessary libraries import logging import sys import os import asyncio import shutil import tempfile from readfile import prepare_files, USER_FILES_DIR from typing import List, Dict, Tuple from dotenv import load_dotenv from langchain_community.document_loaders import PyMuPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document from langchain_core.messages import AIMessage from langchain_openai import OpenAIEmbeddings from docx import Document as DocxDocument from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI import openai import json import numpy as np from sklearn.metrics.pairwise import cosine_similarity import chainlit as cl import asyncio from readfile import prepare_files from promptsplitembed import create_prompt, split_documents, generate_embeddings, create_qamodel from extractjson import extract_json from calcscore import compute_cosine_similarity, llm_similarity from prompt_templates import ref_prompt, student_prompt, llm_score_prompt_template from process_docs import process_reference, process_student # Load environment variables load_dotenv() OPENAI_API_KEY = os.environ["OPENAI_API_KEY"] openai.api_key = OPENAI_API_KEY # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def compare_docs(answers, student_result): split_reference_docs, ref_tokens = {}, 0 split_student_docs, student_tokens = {}, 0 for key, value in answers.items(): compare_docs, tokens = split_documents([Document(page_content=value)]) split_reference_docs[key] = compare_docs ref_tokens += tokens for key, value in student_result.items(): compare_docs, tokens = split_documents([Document(page_content=value)]) split_student_docs[key] = compare_docs student_tokens += tokens reference_embeddings = {key: generate_embeddings(value)[0] for key, value in split_reference_docs.items()} student_embeddings = {key: generate_embeddings(value)[0] for key, value in split_student_docs.items()} print("Completed comparing student ans solution answers.") return reference_embeddings, student_embeddings, ref_tokens, student_tokens def process_data(zip_file_name: str, prompt_template) -> Tuple[float, float, int, int, int]: documents, reference_document = prepare_files(zip_file_name) reference, answers, ref_gen_tokens = process_reference(reference_document, ref_prompt) student_result, student_gen_tokens = process_student(documents, reference, student_prompt) reference_embeddings, student_embeddings, ref_tokens, student_tokens = compare_docs(answers, student_result) student_total_tokens = student_gen_tokens + student_tokens ref_total_tokens = ref_gen_tokens + ref_tokens average_similarity = compute_cosine_similarity(reference_embeddings, student_embeddings) average_score, llm_score_tokens = llm_similarity(answers, student_result, llm_score_prompt_template) llm_total_tokens = ref_gen_tokens + student_gen_tokens + llm_score_tokens return average_similarity, average_score, ref_total_tokens, student_total_tokens, llm_total_tokens user_wants_to_continue = False uploaded_file_name = None import os import shutil import os import shutil import chainlit as cl from chainlit.types import AskFileResponse @cl.on_chat_start async def start(): global uploaded_file_name files = None # Wait for the user to upload a file while files is None: files = await cl.AskFileMessage( content="Please upload a zip file to begin!", accept={"application/zip": [".zip"]} ).send() zip_file: AskFileResponse = files[0] # Assuming only one file is uploaded uploaded_file_name = zip_file.name # Print out the attributes of the zip_file object for debugging print(f"zip_file attributes: {dir(zip_file)}") # Get the CHAINLIT_USER_FILES_DIR from environment variables user_files_dir = os.environ.get('CHAINLIT_USER_FILES_DIR', '/tmp/chainlit_user_files') # Ensure the user files directory exists os.makedirs(user_files_dir, exist_ok=True) # Save the uploaded file directly to the user files directory file_path = os.path.join(user_files_dir, zip_file.name) # Attempt to read the file content and save it try: with open(file_path, "wb") as f: f.write(zip_file.content) # This may need to be adjusted based on the attributes of zip_file except AttributeError as e: print(f"AttributeError: {e}") # If zip_file.content doesn't exist, try another method try: with open(zip_file.path, "rb") as src_file: with open(file_path, "wb") as dest_file: shutil.copyfileobj(src_file, dest_file) except Exception as e: print(f"Error while copying file: {e}") await cl.Message(content=f"Error while copying file: {e}").send() return # Let the user know that the system is ready await cl.Message(content=f"`{zip_file.name}` uploaded successfully!").send() # Ask if the user wants to proceed with grading await cl.Message(content="Do you want to proceed with the grading? (yes/no)").send() async def process_grading(): global uploaded_file_name if uploaded_file_name: try: user_files_dir = os.environ.get('CHAINLIT_USER_FILES_DIR', '/tmp/chainlit_user_files') file_path = os.path.join(user_files_dir, uploaded_file_name) # Process the uploaded ZIP file average_similarity, average_score, ref_total_tokens, student_total_tokens, llm_total_tokens = process_data(file_path, llm_score_prompt_template) # Send results await cl.Message(content=f"Processing complete. Results:\n" f"Average Similarity: {average_similarity:.2f}\n" f"Average Score: {average_score:.2f}\n" f"Reference Total Tokens: {ref_total_tokens}\n" f"Student Total Tokens: {student_total_tokens}\n" f"LLM Total Tokens: {llm_total_tokens}").send() # Remove the file after processing os.remove(file_path) except Exception as e: await cl.Message(content=f"An error occurred while processing the zip file: {str(e)}").send() else: await cl.Message(content="No file has been uploaded yet. Please upload a ZIP file first.").send() @cl.on_message async def on_message(message: cl.Message): global user_wants_to_continue, uploaded_file_name if message.content.lower() == 'yes' and not user_wants_to_continue: if uploaded_file_name: # Start processing processing_message = cl.Message(content="Processing files...") await processing_message.send() await asyncio.sleep(0.5) await process_grading() # Ask user if they want to continue after processing is done user_wants_to_continue = True await cl.Message(content="Do you want to continue? (yes/no)").send() # ... rest of the function ... if __name__ == "__main__": # Ensure the user files directory exists os.makedirs(USER_FILES_DIR, exist_ok=True) # Your Chainlit app setup and run code here cl.run()