import os import re import logging import json import streamlit as st import pdfplumber from docx import Document from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel import torch # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Paths to rubric files P1_RUBRICS_PATH = os.path.join("data", "rubrics", "p1_rubrics.json") P2_RUBRICS_PATH = os.path.join("data", "rubrics", "p2_rubrics.json") # Load rubrics from JSON files def load_rubrics(project_type): rubric_file = P1_RUBRICS_PATH if project_type.lower() == "group" else P2_RUBRICS_PATH try: with open(rubric_file, "r") as f: return json.load(f) except FileNotFoundError: raise FileNotFoundError(f"Rubric file not found: {rubric_file}") except json.JSONDecodeError: raise ValueError(f"Error decoding JSON from {rubric_file}") # Load model and tokenizer @st.cache_resource def load_model(): adapter_model_name = "woyeso/fine_tuned_llama_3_2_assignment_grader" base_model_name = "unsloth/Llama-3.2-3B-Instruct" # Adjust if the base model differs hf_token = os.getenv("HF_TOKEN") tokenizer = AutoTokenizer.from_pretrained( adapter_model_name, token=hf_token if hf_token else None ) base_model = AutoModelForCausalLM.from_pretrained( base_model_name, torch_dtype=torch.float16, device_map="auto", token=hf_token if hf_token else None ) model = PeftModel.from_pretrained(base_model, adapter_model_name, token=hf_token if hf_token else None) return model, tokenizer model, tokenizer = load_model() # Subcomponent mappings (same as original) P1_SUBCOMPONENTS = { '1.1': 'Information of the Service Recipients Found:', '1.2': 'Information Related to the Use of AI in Teaching and Learning:', '1.3': 'Service Project Title and Topics:', '1.4': 'Specific Project Objectives:', '2.1': 'Design of AI-Related Ice-breaking Games:', '2.2': 'Tasks of Each Team Member:', '3.1': 'Specific STEM Elements Covered:', '3.2': 'Student Abilities to Strengthen:', '3.3': 'Potential Learning Hurdles of Students:', '3.4': 'Facilitating STEM and Overcoming Hurdles:', '4.1': 'List of Materials and Parts:', '4.2': 'List of Tools:' } P2_SUBCOMPONENTS = { '1.1': 'Specific Learning Objectives:', '1.2': 'Content of Each Teaching Kit:', '2.1': 'Describe the Design of Each Teaching Kit:', '2.2': 'How to Prepare (or Make) Each Item of Your Teaching Kit:', '2.3': 'Explain Why Students Will Learn and Play Happily:', '3.1': 'Draw a Diagram to Illustrate Task Breakdown:', '4.1': 'How to Introduce the Specific Topic(s) to Arouse Interest in STEM:', '4.2': 'How to Identify and Overcome Learning Hurdles:', '5.1': 'How to React to Potential Uncertainties:', '5.2': 'How to Self-Evaluate Performance and Make Improvements:' } # Text extraction functions (unchanged) def extract_text_between_strings(text, start_keyword, end_keyword): try: extracted_text = "" start_match = re.search(start_keyword, text, re.MULTILINE) if not start_match: logger.debug(f"Start keyword '{start_keyword}' not found.") return "Not Found" start_index = start_match.end() end_match = re.search(end_keyword, text, re.MULTILINE) if end_match and end_match.start() > start_match.start(): end_index = end_match.start() extracted_text = text[start_index:end_index].strip() else: extracted_text = text[start_index:].strip() if not extracted_text: logger.debug(f"End keyword '{end_keyword}' not found or no content extracted.") return "Not Found" lines = extracted_text.split('\n') formatted_lines = [] bullet_pattern = re.compile(r'^\s*(\d+\.|\•|-|◦|➢)\s*(.+)$') for line in lines: line = line.strip() if not line: continue bullet_match = bullet_pattern.match(line) if bullet_match: bullet, text = bullet_match.groups() formatted_lines.append(f"{bullet} {text}") else: formatted_lines.append(line) cleaned_text = "\n".join(formatted_lines).strip() cleaned_text = re.sub(r'\s+', ' ', cleaned_text.replace('\n', '\n ')) return cleaned_text.replace("XYZ students", "Hong Chi students") except Exception as e: logger.error(f"Error extracting text: {e}") return f"Error: {e}" def extract_text_from_pdf(filepath, assignment_type='P1'): results = {} subcomponents = P1_SUBCOMPONENTS if assignment_type == 'P1' else P2_SUBCOMPONENTS sorted_codes = sorted(subcomponents.keys(), key=lambda x: [int(n) for n in x.split('.')]) with pdfplumber.open(filepath) as pdf: text = "" for page in pdf.pages: page_text = page.extract_text() or "" text += page_text + "\n" for i, code in enumerate(sorted_codes): start_keyword = r"^{}\s*[.:]?\s*".format(re.escape(code)) if i + 1 < len(sorted_codes): end_keyword = r"^{}\s*[.:]?\s*".format(re.escape(sorted_codes[i + 1])) else: end_keyword = r"^5\.\s*" if assignment_type == 'P1' else r"^6\.\s*" logger.debug(f"Extracting section {code} with start_keyword={start_keyword}, end_keyword={end_keyword}") content = extract_text_between_strings(text, start_keyword, end_keyword) results[code] = { "title": subcomponents[code], "content": content } return results def extract_text_from_docx(filepath, assignment_type='P1'): try: doc = Document(filepath) elements = [] for para in doc.paragraphs: text = para.text.strip() if text: style = para.style.name elements.append(('paragraph', text, style)) for table in doc.tables: table_text = [] for row in table.rows: row_text = [cell.text.strip() for cell in row.cells if cell.text.strip()] if row_text: table_text.append(" ".join(row_text)) if table_text: elements.append(('table', "\n".join(table_text), 'Table')) logger.debug(f"Extracted {len(elements)} elements from DOCX") results = {} subcomponents = P1_SUBCOMPONENTS if assignment_type == 'P1' else P2_SUBCOMPONENTS sorted_codes = sorted(subcomponents.keys(), key=lambda x: [int(n) for n in x.split('.')]) current_section = None section_content = [] section_pattern = re.compile(r'^\s*(\d+\.\d+\.?)\s*[.:]?\s*(.*)?$') end_pattern = re.compile(r'^\s*5\.\s*' if assignment_type == 'P1' else r'^\s*6\.\s*') bullet_pattern = re.compile(r'^\s*(\d+\.|\•|-|◦|➢)\s*(.+)$') for i, (elem_type, text, style) in enumerate(elements): logger.debug(f"Processing element {i}: type={elem_type}, style={style}, text={text[:100]}...") lines = text.split('\n') for line in lines: line = line.strip() if not line: continue section_match = section_pattern.match(line) if section_match: code, title = section_match.groups() code = code.rstrip('.') if current_section and current_section in subcomponents: formatted_lines = [] for content_line in section_content: bullet_match = bullet_pattern.match(content_line) if bullet_match: bullet, text = bullet_match.groups() formatted_lines.append(f"{bullet} {text}") else: formatted_lines.append(content_line) cleaned_content = "\n".join(formatted_lines).strip() cleaned_content = re.sub(r'\s+', ' ', cleaned_content.replace('\n', '\n ')) cleaned_content = cleaned_content.replace("XYZ students", "Hong Chi students") results[current_section] = { "title": subcomponents[current_section], "content": cleaned_content if cleaned_content else "Not Found" } current_section = code section_content = [] if title: section_content.append(title) logger.debug(f"Started section {code} at element {i}") continue end_match = end_pattern.match(line) if end_match and current_section: formatted_lines = [] for content_line in section_content: bullet_match = bullet_pattern.match(content_line) if bullet_match: bullet, text = bullet_match.groups() formatted_lines.append(f"{bullet} {text}") else: formatted_lines.append(content_line) cleaned_content = "\n".join(formatted_lines).strip() cleaned_content = re.sub(r'\s+', ' ', cleaned_content.replace('\n', '\n ')) cleaned_content = cleaned_content.replace("XYZ students", "Hong Chi students") results[current_section] = { "title": subcomponents[current_section], "content": cleaned_content if cleaned_content else "Not Found" } current_section = None section_content = [] logger.debug(f"Ended section at element {i} with end marker") continue if current_section: if style.startswith('List') or bullet_pattern.match(line): bullet_match = bullet_pattern.match(line) if bullet_match: bullet, text = bullet_match.groups() section_content.append(f"{bullet} {text}") else: section_content.append(f"- {line}") else: section_content.append(line) if current_section and current_section in subcomponents: formatted_lines = [] for content_line in section_content: bullet_match = bullet_pattern.match(content_line) if bullet_match: bullet, text = bullet_match.groups() formatted_lines.append(f"{bullet} {text}") else: formatted_lines.append(content_line) cleaned_content = "\n".join(formatted_lines).strip() cleaned_content = re.sub(r'\s+', ' ', cleaned_content.replace('\n', '\n ')) cleaned_content = cleaned_content.replace("XYZ students", "Hong Chi students") results[current_section] = { "title": subcomponents[current_section], "content": cleaned_content if cleaned_content else "Not Found" } for code in sorted_codes: if code not in results: results[code] = { "title": subcomponents[code], "content": "Not Found" } logger.debug(f"Subcomponent {code} not found in DOCX") return results except Exception as e: logger.error(f"Error extracting text from DOCX: {e}") return {} # Function to evaluate submission using the model def evaluate_submission(subcomponent, project_type, rubric, submission, school_name): prompt = ( f"Can you evaluate my project submission for Subcomponent {subcomponent} in a {project_type} project (P1 for group, P2 for individual).\n" f"Here is the rubric: {rubric}. Evaluate the submission against each rubric criterion. Focus on the rubric criteria as the primary basis for your evaluation.\n" f"My submission is {submission}.\n\n" f"If a school name is provided, use it in your evaluation: {school_name}. If no school name is provided, refer to the students generically as 'students'.\n" f"Do not use the placeholder 'XYZ students' in your evaluation, as it was used during training but should be replaced with the specific school name or 'students'.\n\n" f"Summarize the strengths of the submission (what it does well according to the rubric).\n" f"Summarize the weaknesses of the submission (where it falls short according to the rubric).\n" f"Provide specific suggestions for improvement to help the student improve their submission.\n\n" f"Give me an overall mark out of 10, and don't be too strict. Ensure you provide the score in the format: . Do not omit the score and follow format of X/10." ) inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=512) inputs = {k: v.to(model.device) for k, v in inputs.items()} with torch.no_grad(): outputs = model.generate( input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"], max_new_tokens=256, temperature=0.7, top_p=0.9, do_sample=True ) feedback = tokenizer.decode(outputs[0], skip_special_tokens=True) return feedback # Streamlit app st.title("Assignment Grader App") # File upload uploaded_file = st.file_uploader("Upload PDF/DOCX", type=["pdf", "docx"]) project_type = st.selectbox("Project Type", ["Group (P1)", "Individual (P2)"]) school_name = st.text_input("School Name (Optional)") group_number = st.text_input("Group Number (Optional)") # Manual text input as fallback manual_text = st.text_area("Or enter your submission text manually (optional)") if st.button("Evaluate"): if uploaded_file or manual_text: # Load rubrics project_type_short = "Group" if project_type == "Group (P1)" else "Individual" project = "P1" if project_type == "Group (P1)" else "P2" try: rubrics = load_rubrics(project_type_short) except Exception as e: st.error(f"Error loading rubrics: {str(e)}") st.stop() # Extract text from file or use manual input submission_dict = {} if uploaded_file: with open("/tmp/uploaded_file", "wb") as f: f.write(uploaded_file.read()) if uploaded_file.name.endswith(".pdf"): results = extract_text_from_pdf("/tmp/uploaded_file", project) else: results = extract_text_from_docx("/tmp/uploaded_file", project) os.remove("/tmp/uploaded_file") for subcomponent, data in results.items(): if data["content"] != "Not Found": submission_dict[subcomponent] = data["content"] else: submission_dict["1.1"] = manual_text # Simplified for manual input; adjust as needed if not submission_dict: st.error("No text extracted from the file or provided manually.") st.stop() # Evaluate submissions evaluations = [] total_score = 0 total_weight = 0 with st.spinner("Evaluating submission..."): for rubric in rubrics: subcomponent = rubric["subcomponent"] if subcomponent not in submission_dict: continue submission = submission_dict[subcomponent] evaluation = evaluate_submission( subcomponent, project_type_short, rubric["criteria"], submission, school_name if school_name else "Not provided" ) if school_name: evaluation = evaluation.replace("XYZ students", f"{school_name} students") else: evaluation = evaluation.replace("XYZ students", "students") score_match = re.search(r"Overall Mark:\s*([\d.]+)(?:\s*/\s*10)?", evaluation, re.IGNORECASE) score = float(score_match.group(1)) if score_match else 0 weight = rubric.get("weight", 1.0) total_score += score * weight total_weight += weight evaluations.append({ "subcomponent": subcomponent, "evaluation": evaluation, "score": score, "weight": weight }) # Calculate final grade final_grade = (total_score / total_weight) * 10 if total_weight > 0 else 0 final_grade = round(final_grade, 2) # Display results group_display = f" {group_number}" if group_number else "" summary = f"**Summary of Evaluations for {project} Project (Group{group_display})**\n\n" separator = "********************************************************************\n" for i, eval in enumerate(evaluations): summary += f"**Subcomponent {eval['subcomponent']} (Weight: {eval['weight']*100}%)**\n" summary += eval["evaluation"] summary += "\n\n" if i < len(evaluations) - 1: summary += separator summary += f"**Final Total Grade: {final_grade}%**" st.subheader("Evaluation Results") st.markdown(summary) else: st.error("Please upload a file or enter text manually.")