TRIAL / app.py
atz21's picture
Update app.py
6295c4b verified
import os
import re
import json
import subprocess
import time
import shutil
import img2pdf
import gradio as gr
from google import genai # NEW SDK
from pdf2image import convert_from_path
from PIL import Image, ImageDraw, ImageFont
import cv2
import numpy as np
from PyPDF2 import PdfReader, PdfWriter
from prompts import QP_MS_TRANSCRIPTION_PROMPT, get_grading_prompt
from supabase import create_client, Client
# ---------------- CONFIG ----------------
# Multi-API Key Configuration for handling RESOURCE_EXHAUSTED errors
class GeminiClientManager:
"""Manages multiple Gemini API keys with automatic rotation on quota exhaustion."""
def __init__(self):
# Load all three API keys from environment
self.api_keys = [
os.getenv("GEMINI_API_KEY_1"),
os.getenv("GEMINI_API_KEY_2"),
os.getenv("GEMINI_API_KEY_3")
]
# Filter out None values
self.api_keys = [key for key in self.api_keys if key]
if not self.api_keys:
raise ValueError("❌ No API keys found! Please set at least GEMINI_API_KEY_1")
print(f"βœ… Loaded {len(self.api_keys)} Gemini API key(s)")
# Current key index (0 = primary)
self.current_key_index = 0
# Create clients for all keys
self.clients = [genai.Client(api_key=key) for key in self.api_keys]
def get_current_client(self):
"""Get the currently active client."""
return self.clients[self.current_key_index]
def rotate_to_next_key(self):
"""Rotate to the next available API key."""
if len(self.api_keys) == 1:
print("⚠️ Only one API key available, cannot rotate")
return False
old_index = self.current_key_index
self.current_key_index = (self.current_key_index + 1) % len(self.api_keys)
print(f"πŸ”„ Rotating from API key #{old_index + 1} to API key #{self.current_key_index + 1}")
return True
def reset_to_primary(self):
"""Reset to primary (first) API key."""
if self.current_key_index != 0:
print(f"πŸ”™ Resetting to primary API key #1")
self.current_key_index = 0
# Initialize the client manager
client_manager = GeminiClientManager()
client = client_manager.get_current_client() # For backward compatibility
GRID_ROWS, GRID_COLS = 20, 14
# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
SUPABASE_BUCKET = "examfiles"
# Initialize Supabase client (only if credentials are available)
supabase_client = None
if SUPABASE_URL and SUPABASE_SERVICE_KEY:
try:
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
print("βœ… Supabase client initialized successfully")
except Exception as e:
print(f"⚠️ Supabase initialization failed: {e}")
else:
print("⚠️ Supabase credentials not found - file upload to storage disabled")
# ---------------- PROMPTS ----------------
# Prompts are now imported from prompts.py
# ---------------- SUPABASE HELPERS ----------------
def upload_file_to_supabase(local_path, file_type="unknown", timestamp=None):
"""
Upload a file to Supabase Storage.
Args:
local_path (str): Local file path
file_type (str): Type of file (qp, ms, ans, graded, imprinted)
timestamp (str): Unix timestamp for folder organization (optional)
Returns:
str: Public URL of uploaded file or None if upload failed
"""
if not supabase_client:
print("⚠️ Supabase not configured - skipping upload")
return None
try:
if timestamp is None:
timestamp = str(int(time.time()))
original_name = os.path.basename(local_path)
# Use original filename without prefix for cleaner storage
remote_path = f"{timestamp}/{original_name}"
print(f"πŸ“€ Uploading {file_type} to Supabase: {remote_path}")
with open(local_path, "rb") as f:
supabase_client.storage.from_(SUPABASE_BUCKET).upload(
remote_path,
f,
file_options={"upsert": "true"}
)
public_url = f"{SUPABASE_URL}/storage/v1/object/public/{SUPABASE_BUCKET}/{remote_path}"
print(f"βœ… Uploaded successfully: {public_url}")
return public_url
except Exception as e:
print(f"❌ Supabase upload failed for {file_type}: {e}")
return None
def process_and_upload_input_files(qp_file_obj, ms_file_obj, ans_file_obj):
"""
Process uploaded files and upload them to Supabase using a shared timestamp.
Args:
qp_file_obj: Gradio file object for Question Paper
ms_file_obj: Gradio file object for Markscheme
ans_file_obj: Gradio file object for Answer Sheet
Returns:
tuple: (qp_path, ms_path, ans_path, upload_urls_dict, timestamp)
"""
print("\n" + "="*60)
print("πŸ“ PROCESSING INPUT FILES")
print("="*60)
# Generate single timestamp for this entire run
run_timestamp = str(int(time.time()))
print(f"πŸ• Run timestamp: {run_timestamp}")
upload_urls = {
"qp_url": None,
"ms_url": None,
"ans_url": None
}
# Get local paths from Gradio file objects
qp_path = qp_file_obj.name if qp_file_obj else None
ms_path = ms_file_obj.name if ms_file_obj else None
ans_path = ans_file_obj.name if ans_file_obj else None
# Upload to Supabase if configured (all files use same timestamp)
if supabase_client:
if qp_path:
upload_urls["qp_url"] = upload_file_to_supabase(qp_path, "qp", run_timestamp)
if ms_path:
upload_urls["ms_url"] = upload_file_to_supabase(ms_path, "ms", run_timestamp)
if ans_path:
upload_urls["ans_url"] = upload_file_to_supabase(ans_path, "ans", run_timestamp)
print("="*60 + "\n")
return qp_path, ms_path, ans_path, upload_urls, run_timestamp
# ---------------- HELPERS ----------------
def parse_md_table(md):
"""Parse a Markdown table into a list of rows."""
lines = [l for l in md.split("\n") if l.strip()]
if len(lines) < 3:
return []
lines = lines[2:] # skip header + separator
rows = []
for line in lines:
parts = [c.strip() for c in line.strip("|").split("|")]
# Filter out empty strings from leading/trailing pipes
clean_parts = [p for p in parts if p]
if clean_parts:
rows.append(clean_parts)
return rows
def convert_html_color_spans(md_text):
"""Convert HTML color spans to LaTeX textcolor commands."""
pattern = r'<span\s+style="color:\s*([^"]+)">\s*(.*?)\s*</span>'
def repl(m):
color = m.group(1).strip()
text = m.group(2)
return fr'\textcolor{{{color}}}{{{text}}}'
return re.sub(pattern, repl, md_text, flags=re.IGNORECASE)
def cleanup_markdown_for_latex(md_text):
"""Clean up markdown text for better LaTeX conversion."""
# Ensure spacing between bold headers and tables
md_text = re.sub(r'(\*\*Markscheme vs Student Answer\*\*)\s*(\|)', r'\1\n\n\2', md_text)
# Convert common unicode math symbols to LaTeX (safety net)
replacements = {
'∫': r'\int ',
'Β²': '^2',
'Β³': '^3',
'Β½': r'\frac{1}{2}',
'ΒΌ': r'\frac{1}{4}',
'∞': r'\infty',
'≀': r'\leq',
'β‰₯': r'\geq',
'β‰ ': r'\neq',
'Β±': r'\pm',
'Γ—': r'\times',
'Γ·': r'\div',
'√': r'\sqrt',
'βˆ‘': r'\sum',
'∏': r'\prod',
'βˆ‚': r'\partial',
'Ο€': r'\pi',
'ΞΈ': r'\theta',
'Ξ±': r'\alpha',
'Ξ²': r'\beta',
'Ξ³': r'\gamma',
'Ξ΄': r'\delta',
'Ξ΅': r'\epsilon',
'Ξ»': r'\lambda',
'ΞΌ': r'\mu',
'Οƒ': r'\sigma',
'Ξ”': r'\Delta',
'Ξ£': r'\Sigma',
'Ξ©': r'\Omega'
}
for char, latex in replacements.items():
md_text = md_text.replace(char, f'${latex}$')
return md_text
def escape_latex_special_chars(text):
"""Escape special LaTeX characters in text."""
replacements = {
'%': r'\%',
'&': r'\&',
'#': r'\#',
'_': r'\_',
'{': r'\{',
'}': r'\}',
'~': r'\textasciitilde{}',
'^': r'\textasciicircum{}'
}
# Don't escape if already in math mode or LaTeX command
if '$' in text or '\\' in text:
return text
for char, escaped in replacements.items():
text = text.replace(char, escaped)
return text
def save_as_pdf(text, filename="output.pdf"):
"""
Convert Markdown text to PDF using Pandoc with pdflatex.
Extracts the Examiner's Summary Report and places it at the top with enhanced formatting.
Converts HTML color spans to LaTeX textcolor commands.
Args:
text (str): Markdown content to convert
filename (str): Output PDF filename
Returns:
str: Path to the generated PDF file
Raises:
Exception: If Pandoc or pdflatex is not available, or conversion fails
"""
# Sanitize filename - replace spaces and special characters with underscores
# This prevents issues with pdflatex and file operations
import string
valid_chars = f"-_.() {string.ascii_letters}{string.digits}"
sanitized_filename = ''.join(c if c in valid_chars else '_' for c in filename)
# Replace multiple spaces with single underscore
sanitized_filename = re.sub(r'\s+', '_', sanitized_filename)
# Remove double underscores
sanitized_filename = re.sub(r'_+', '_', sanitized_filename)
if sanitized_filename != filename:
print(f"ℹ️ Sanitized filename: '{filename}' β†’ '{sanitized_filename}'")
filename = sanitized_filename
base_name = os.path.splitext(filename)[0]
temp_md_file = f"{base_name}_input.md"
temp_tex_file = f"{base_name}_temp.tex"
print("\n" + "="*60)
print("πŸ“„ MARKDOWN TO PDF CONVERSION PROCESS")
print("="*60)
try:
# Step 1: Extract Summary Report Table
print("\n[STEP 1/6] Extracting Examiner's Summary Report...")
summary_pattern = re.compile(
r"### Examiner's Summary Report\s*\n\n(\|.*?\|)\s*\n\n\*\*Total:\s*(.*?)\*\*",
re.DOTALL
)
summary_match = summary_pattern.search(text)
if summary_match:
summary_table_md = summary_match.group(1)
summary_total = summary_match.group(2)
text = summary_pattern.sub("", text)
print(f" βœ… SUCCESS: Extracted summary report with total: {summary_total}")
else:
summary_table_md = ""
summary_total = ""
print(" ⚠️ WARNING: No Examiner's Summary Report found in markdown")
# Step 2: Clean up markdown
print("\n[STEP 2/6] Cleaning markdown and converting HTML to LaTeX...")
text = cleanup_markdown_for_latex(text)
text = convert_html_color_spans(text)
print(" βœ… SUCCESS: Markdown cleaned and HTML color spans converted")
# Save cleaned markdown
with open(temp_md_file, 'w', encoding='utf-8') as f:
f.write(text)
print(f" πŸ“ Saved cleaned markdown to: {temp_md_file}")
# Step 3: Convert MD to LaTeX via Pandoc
print("\n[STEP 3/6] Converting markdown to LaTeX using Pandoc...")
pandoc_cmd = [
"pandoc",
"--from=markdown",
"--to=latex",
"--standalone",
temp_md_file,
"-o", temp_tex_file
]
print(f" πŸ”§ Running: {' '.join(pandoc_cmd)}")
result = subprocess.run(pandoc_cmd, capture_output=True, check=False)
if result.returncode != 0:
try:
stderr = result.stderr.decode('utf-8', errors='replace')
except:
stderr = str(result.stderr)
print(f" ❌ FAILED: Pandoc returned error code {result.returncode}")
print(f" Error details: {stderr[:500]}")
raise Exception(f"Pandoc conversion failed: {stderr}")
if not os.path.exists(temp_tex_file):
print(f" ❌ FAILED: LaTeX file not created at {temp_tex_file}")
raise Exception("Pandoc did not create the expected LaTeX file")
print(f" βœ… SUCCESS: LaTeX file created at {temp_tex_file}")
# Step 4: Modify the generated LaTeX
print("\n[STEP 4/6] Enhancing LaTeX document...")
with open(temp_tex_file, "r", encoding="utf-8") as f:
tex = f.read()
tex = tex.replace(
r"\documentclass{article}",
r"\documentclass[12pt]{extarticle}"
)
insert_packages = r"""\usepackage[a4paper, margin=1in]{geometry}
\usepackage{xcolor}
\usepackage{colortbl}
\usepackage{booktabs}
\usepackage{array}
\usepackage{longtable}
\renewcommand{\arraystretch}{1.4}
\newcolumntype{L}[1]{>{\raggedright\arraybackslash}p{#1}}"""
tex = tex.replace(r"\begin{document}", insert_packages + "\n\\begin{document}")
print(" βœ… SUCCESS: Enhanced document class and added packages")
# Step 5: Build enhanced LaTeX table for summary
if summary_table_md:
print("\n[STEP 5/6] Building enhanced summary table...")
summary_rows = parse_md_table(summary_table_md)
print(f" πŸ“Š Parsed {len(summary_rows)} rows from summary table")
summary_latex = r"""\section*{Examiner's Summary Report}
\begin{center}
\rowcolors{2}{gray!10}{white}
\begin{tabular}{|c|c|c|L{8cm}|}
\hline
\rowcolor{gray!30}
\textbf{Question} & \textbf{Marks} & \textbf{Remark} & \textbf{Feedback} \\ \hline
"""
for row in summary_rows:
if len(row) >= 4:
feedback = row[3]
if not ('$' in feedback or '\\textcolor' in feedback):
feedback = feedback.replace('%', r'\%').replace('&', r'\&').replace('#', r'\#')
summary_latex += f"{row[0]} & {row[1]} & {row[2]} & {feedback} \\\\ \\hline\n"
summary_latex += r"\end{tabular}"
summary_latex += "\n\\end{center}\n\n"
summary_latex += f"\\vspace{{0.5cm}}\\noindent\\textbf{{\\Large Overall Score: {summary_total}}}\n\n"
summary_latex += "\\hrulefill\n\\vspace{1cm}\n\n"
summary_latex += "\\newpage\n\n"
tex = tex.replace(
r"\begin{document}",
r"\begin{document}" + "\n\n" + summary_latex
)
print(" βœ… SUCCESS: Summary table with zebra striping injected at document top")
else:
print("\n[STEP 5/6] Skipping summary table (not found)")
with open(temp_tex_file, "w", encoding="utf-8") as f:
f.write(tex)
# Step 6: Compile PDF with pdflatex
print("\n[STEP 6/6] Compiling PDF with pdflatex...")
pdflatex_cmd = [
"pdflatex",
"-interaction=nonstopmode",
f"-output-directory={os.path.dirname(os.path.abspath(temp_tex_file)) or '.'}",
temp_tex_file
]
print(" πŸ”§ Running pdflatex (pass 1/2)...")
result1 = subprocess.run(pdflatex_cmd, capture_output=True, check=False)
print(" πŸ”§ Running pdflatex (pass 2/2)...")
result2 = subprocess.run(pdflatex_cmd, capture_output=True, check=False)
temp_pdf = temp_tex_file.replace(".tex", ".pdf")
if not os.path.exists(temp_pdf):
print(f" ❌ FAILED: PDF not created at {temp_pdf}")
try:
stderr = result2.stderr.decode('utf-8', errors='replace')
except:
stderr = str(result2.stderr)
log_file = temp_tex_file.replace(".tex", ".log")
if os.path.exists(log_file):
print(f" πŸ“‹ Checking LaTeX log file: {log_file}")
try:
with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
log_content = f.read()
error_lines = [line for line in log_content.split('\n') if '!' in line]
if error_lines:
print(f" ❌ LaTeX Errors found ({len(error_lines)} lines):")
for err_line in error_lines[:10]:
print(f" {err_line}")
stderr += "\n\nLaTeX Errors:\n" + "\n".join(error_lines[:10])
except Exception as log_err:
print(f" ⚠️ Could not read log file: {log_err}")
raise Exception(f"pdflatex failed to create PDF. Error: {stderr[:1000]}")
print(f" βœ… SUCCESS: PDF compiled at {temp_pdf}")
# Move output PDF to final filename
if os.path.exists(filename):
os.remove(filename)
os.rename(temp_pdf, filename)
print(f" πŸ“¦ Moved to final location: {filename}")
# Clean up temporary files
print("\n[CLEANUP] Removing temporary files...")
cleaned_count = 0
for ext in [".md", ".tex", ".aux", ".log", ".out"]:
temp_file = base_name + ext
if os.path.exists(temp_file):
os.remove(temp_file)
cleaned_count += 1
for prefix in ["_input", "_temp"]:
temp_file = base_name + prefix + ext
if os.path.exists(temp_file):
os.remove(temp_file)
cleaned_count += 1
print(f" 🧹 Cleaned up {cleaned_count} temporary files")
print("\n" + "="*60)
print("βœ… PDF CONVERSION COMPLETED SUCCESSFULLY")
print(f"πŸ“„ Output file: {filename}")
print("="*60 + "\n")
return filename
except subprocess.CalledProcessError as e:
print(f"\n❌ SUBPROCESS ERROR: {e}")
print(f" STDOUT: {e.stdout}")
print(f" STDERR: {e.stderr}")
print("="*60 + "\n")
raise Exception(f"PDF conversion failed: {e.stderr}")
except FileNotFoundError as e:
print(f"\n❌ FILE NOT FOUND ERROR: {e}")
print("="*60)
print("⚠️ REQUIRED TOOLS MISSING")
print("Please install the following:")
print(" β€’ pandoc")
print(" β€’ texlive (or MiKTeX on Windows)")
print(" β€’ texlive-latex-extra (for extarticle class)")
print("="*60 + "\n")
raise Exception(
"Pandoc or pdflatex not found. Please install:\n"
" - pandoc\n"
" - texlive (or MiKTeX on Windows)\n"
" - texlive-latex-extra (for extarticle class)"
)
except Exception as e:
print(f"\n❌ UNEXPECTED ERROR: {e}")
import traceback
traceback.print_exc()
print("="*60 + "\n")
raise
def compress_pdf(input_path, output_path=None, max_size=20*1024*1024):
if output_path is None:
base, ext = os.path.splitext(input_path)
output_path = f"{base}_compressed{ext}"
try:
size = os.path.getsize(input_path)
except Exception:
return input_path
if size <= max_size:
print(f"ℹ️ Not compressing {input_path} ({size/1024/1024:.2f} MB <= {max_size/1024/1024} MB)")
return input_path
print(f"πŸ”Ž Compressing {input_path} ({size/1024/1024:.2f} MB) -> {output_path}")
try:
gs_cmd = [
"gs", "-sDEVICE=pdfwrite",
"-dCompatibilityLevel=1.4",
"-dPDFSETTINGS=/ebook",
"-dNOPAUSE", "-dQUIET", "-dBATCH",
f"-sOutputFile={output_path}", input_path
]
subprocess.run(gs_cmd, check=True)
new_size = os.path.getsize(output_path)
print(f"βœ… Compression done. New size: {new_size/1024/1024:.2f} MB")
if new_size <= max_size:
return output_path
else:
print("⚠️ Compressed file still larger than threshold; returning original")
return input_path
except Exception as e:
print("❌ Compression error:", e)
return input_path
def upload_to_gemini(path, display_name=None):
"""
Upload a file to Gemini using the NEW google-genai SDK.
Uses the current active API key from client_manager.
"""
print(f"πŸ“€ Uploading {path} to Gemini...")
try:
current_client = client_manager.get_current_client()
uploaded_file = current_client.files.upload(file=path)
# Wait for processing to complete
print(f"⏳ Waiting for file processing: {uploaded_file.name}")
while uploaded_file.state.name == "PROCESSING":
time.sleep(2)
uploaded_file = current_client.files.get(name=uploaded_file.name)
if uploaded_file.state.name == "FAILED":
raise Exception(f"File processing failed: {uploaded_file.name}")
print(f"βœ… Uploaded and processed: {uploaded_file.name}")
return uploaded_file
except Exception as e:
print(f"❌ Upload failed for {path}: {e}")
raise
def merge_pdfs(paths, output_path):
writer = PdfWriter()
for p in paths:
reader = PdfReader(p)
for page in reader.pages:
writer.add_page(page)
with open(output_path, "wb") as f:
writer.write(f)
return output_path
def gemini_generate_content(prompt_text, file_upload_obj=None, image_obj=None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash", fallback_model_2="gemini-2.5-flash-lite", file_path=None):
"""
Send prompt_text and optionally an uploaded file (or an image object/list) to the model using NEW SDK.
Automatically rotates through available API keys on RESOURCE_EXHAUSTED errors.
When rotating keys with file uploads, re-uploads the file with the new API key.
Args:
prompt_text: The prompt to send
file_upload_obj: Previously uploaded file object (optional)
image_obj: Image or list of images (optional)
model_name: Primary model to use
fallback_model: First fallback model if primary fails
fallback_model_2: Second fallback model if first fallback fails
file_path: Local file path (needed for re-upload when rotating keys)
Returns textual response and prints progress.
"""
contents = [prompt_text]
current_file_obj = file_upload_obj
if current_file_obj:
contents.append(current_file_obj)
if image_obj:
if isinstance(image_obj, list):
for img_path in image_obj:
if isinstance(img_path, str):
pil_img = Image.open(img_path)
contents.append(pil_img)
else:
contents.append(img_path)
else:
if isinstance(image_obj, str):
pil_img = Image.open(image_obj)
contents.append(pil_img)
else:
contents.append(image_obj)
print("πŸ“‘ Sending request to Gemini (prompt length:", len(prompt_text), "chars )")
# Try with all available API keys
max_attempts = len(client_manager.api_keys)
attempt = 0
while attempt < max_attempts:
current_client = client_manager.get_current_client()
current_key_num = client_manager.current_key_index + 1
# Update contents with current file object
contents = [prompt_text]
if current_file_obj:
contents.append(current_file_obj)
if image_obj:
if isinstance(image_obj, list):
for img_path in image_obj:
if isinstance(img_path, str):
pil_img = Image.open(img_path)
contents.append(pil_img)
else:
contents.append(img_path)
else:
if isinstance(image_obj, str):
pil_img = Image.open(image_obj)
contents.append(pil_img)
else:
contents.append(image_obj)
# Try primary model first
try:
print(f"πŸ”‘ Using API key #{current_key_num} with model {model_name}")
response = current_client.models.generate_content(
model=model_name,
contents=contents
)
raw_text = response.text
print(f"πŸ“₯ Received response (chars): {len(raw_text)}")
# Success! Reset to primary key for next request
client_manager.reset_to_primary()
return raw_text
except Exception as e:
error_str = str(e)
print(f"❌ Generation failed with API key #{current_key_num} and model {model_name}: {e}")
# Check if it's a RESOURCE_EXHAUSTED error
if "429" in error_str or "RESOURCE_EXHAUSTED" in error_str:
print(f"⚠️ Quota exhausted for API key #{current_key_num} with model {model_name}")
# Try first fallback model with SAME API key
print(f"⚑ Trying fallback model {fallback_model} with same API key #{current_key_num}")
try:
response = current_client.models.generate_content(
model=fallback_model,
contents=contents
)
raw_text = response.text
print(f"πŸ“₯ Received response (chars): {len(raw_text)}")
client_manager.reset_to_primary()
return raw_text
except Exception as e_fallback:
error_fallback_str = str(e_fallback)
print(f"❌ Fallback model {fallback_model} also failed: {e_fallback}")
# Check if first fallback also exhausted
if "429" in error_fallback_str or "RESOURCE_EXHAUSTED" in error_fallback_str:
print(f"⚠️ First fallback model also exhausted for API key #{current_key_num}")
# Try second fallback model with SAME API key
print(f"⚑ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}")
try:
response = current_client.models.generate_content(
model=fallback_model_2,
contents=contents
)
raw_text = response.text
print(f"πŸ“₯ Received response (chars): {len(raw_text)}")
client_manager.reset_to_primary()
return raw_text
except Exception as e_fallback_2:
error_fallback_2_str = str(e_fallback_2)
print(f"❌ Second fallback model {fallback_model_2} also failed: {e_fallback_2}")
# Check if second fallback also exhausted
if "429" in error_fallback_2_str or "RESOURCE_EXHAUSTED" in error_fallback_2_str:
print(f"⚠️ All 3 models exhausted for API key #{current_key_num}")
# Now try next API key if available
if attempt < max_attempts - 1:
# Check if we have file uploads and can re-upload
if file_upload_obj and file_path:
print(f"πŸ”„ Rotating to next API key and re-uploading file...")
client_manager.rotate_to_next_key()
# Re-upload file with new API key
try:
print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...")
current_file_obj = upload_to_gemini(file_path)
print(f"βœ… File re-uploaded successfully")
except Exception as upload_error:
print(f"❌ Failed to re-upload file: {upload_error}")
raise Exception(f"Failed to re-upload file with new API key: {upload_error}")
attempt += 1
print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...")
continue
elif file_upload_obj and not file_path:
print("⚠️ WARNING: Cannot rotate API keys - file_path not provided for re-upload!")
print(" To enable API key rotation with file uploads, pass file_path parameter.")
raise Exception(f"All 3 models exhausted for API key #{current_key_num}. Cannot rotate without file_path.")
else:
# No file uploads, safe to rotate
client_manager.rotate_to_next_key()
attempt += 1
print(f"πŸ”„ Trying next API key (attempt {attempt + 1}/{max_attempts})...")
continue
else:
raise Exception(f"All {max_attempts} API key(s) exhausted with all 3 models.")
else:
# Second fallback failed with different error
raise Exception(f"Second fallback model failed: {e_fallback_2}")
else:
# First fallback failed with different error
raise Exception(f"First fallback model failed: {e_fallback}")
elif "403" in error_str or "PERMISSION_DENIED" in error_str:
# This happens when trying to access a file uploaded with a different API key
print(f"⚠️ Permission denied - likely due to file uploaded with different API key")
# Try to re-upload if we have the file path
if file_path and attempt < max_attempts - 1:
print(f"πŸ”„ Attempting to re-upload file with next API key...")
client_manager.rotate_to_next_key()
try:
print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...")
current_file_obj = upload_to_gemini(file_path)
print(f"βœ… File re-uploaded successfully")
attempt += 1
print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...")
continue
except Exception as upload_error:
print(f"❌ Failed to re-upload file: {upload_error}")
raise Exception(f"Failed to re-upload file with new API key: {upload_error}")
else:
raise Exception(f"File access denied. Cannot re-upload without file_path. Error: {e}")
else:
# Other error - try fallback models with same key
print(f"⚑ Trying fallback model {fallback_model} with same API key #{current_key_num}")
try:
response = current_client.models.generate_content(
model=fallback_model,
contents=contents
)
raw_text = response.text
print(f"πŸ“₯ Received response (chars): {len(raw_text)}")
client_manager.reset_to_primary()
return raw_text
except Exception as e2:
print(f"❌ First fallback also failed: {e2}")
# Try second fallback
print(f"⚑ Trying second fallback model {fallback_model_2} with same API key #{current_key_num}")
try:
response = current_client.models.generate_content(
model=fallback_model_2,
contents=contents
)
raw_text = response.text
print(f"πŸ“₯ Received response (chars): {len(raw_text)}")
client_manager.reset_to_primary()
return raw_text
except Exception as e3:
print(f"❌ Second fallback also failed: {e3}")
# If we have more keys, try them
if attempt < max_attempts - 1:
if file_upload_obj and file_path:
print(f"πŸ”„ Rotating to next API key and re-uploading file...")
client_manager.rotate_to_next_key()
try:
print(f"πŸ“€ Re-uploading file with API key #{client_manager.current_key_index + 1}...")
current_file_obj = upload_to_gemini(file_path)
print(f"βœ… File re-uploaded successfully")
except Exception as upload_error:
print(f"❌ Failed to re-upload file: {upload_error}")
raise Exception(f"Failed to re-upload file with new API key: {upload_error}")
attempt += 1
print(f"πŸ”„ Retrying with next API key (attempt {attempt + 1}/{max_attempts})...")
continue
elif file_upload_obj and not file_path:
raise Exception(f"All models failed. Cannot rotate keys without file_path. Last error: {e3}")
else:
client_manager.rotate_to_next_key()
attempt += 1
print(f"πŸ”„ Trying next API key (attempt {attempt + 1}/{max_attempts})...")
continue
else:
raise Exception(f"All attempts failed. Last error: {e3}")
# If we exhausted all attempts
raise Exception(f"❌ All {max_attempts} API key(s) exhausted. Please check your quota or try again later.")
# ---------------- PARSERS ----------------
def extract_question_ids_from_qpms(text: str):
"""Extract question IDs from QP+MS transcript."""
print("πŸ”Ž Extracting question IDs from QP+MS transcript using regex...")
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
primary_matches = re.findall(r"^\s*Question\s*[:\s]\s*([\dA-Za-z.()]+)", clean_text, re.MULTILINE)
if primary_matches:
print(f"βœ… Extracted {len(primary_matches)} question IDs from explicit 'Question X' lines.")
print("IDs:", primary_matches)
return primary_matches
fallback_matches = re.findall(r"^\s*(\d+(?:[.)]|\([a-zA-Z0-9]+\))?[a-zA-Z0-9]*)", clean_text, re.MULTILINE)
if fallback_matches:
print(f"βœ… Extracted {len(fallback_matches)} question IDs (fallback numbered lists).")
print("IDs:", fallback_matches)
else:
print("⚠️ No question IDs extracted; will send NA placeholder.")
return fallback_matches
def build_as_cot_prompt_with_expected_ids(expected_ids, qpms_text=None):
"""
Construct the AS transcription prompt injecting the expected IDs block and graph detection instructions,
modifying it to include a Chain-of-Thought (CoT) section using a <think> tag, and
requiring mathematical expressions to be enclosed in LaTeX dollar delimiters ($...$).
Includes explicit rules for interpreting NA-like answers and no-response situations.
"""
if not expected_ids:
ids_block = "{NA}"
else:
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
qpms_section = ""
if qpms_text is not None:
qpms_section = (
"\nYou are also provided with the full transcript of the Question Paper and Markscheme (QP+MS) below."
"\nUse it primarily to resolve ambiguous handwriting and to confirm expected answers when needed."
"\n--- BEGIN QP+MS TRANSCRIPT ---\n"
f"{qpms_text.strip()}\n"
"--- END QP+MS TRANSCRIPT ---\n"
)
prompt = f"""You are a high-quality handwritten transcription assistant, performing transcription with a Chain-of-Thought process.
INPUT: This PDF contains a student's handwritten answer sheet.
{qpms_section}
TASK:
1. **THINKING:** Before transcribing each answer, document your thought process inside a **<think>** tag.
- Identify the question ID. If inferred, note why.
- Detail any ambiguities (unclear numbers, symbols, or structures).
- Explain how ambiguities were resolved, including whether the QP+MS transcript was consulted.
- If QP+MS was consulted but you chose not to change the transcription, state this.
- If the initial question label was incorrect (e.g., 2.a vs 2.b), correct it and briefly explain the reasoning in <think>.
*Example Thinking:*
<think>
- Found Question 3(a).
- The term could be '$2x$' or '21x'.
- Markscheme uses '$21x$', but handwriting matches '$2x$'.
- Decision: transcribe '$2x$'.
</think>
2. **TRANSCRIPTION:** Transcribe the student's answers directly and faithfully.
- Assign each answer to a labelled question ID when present.
- For unlabeled answers, segment logically and mark inferred IDs as "**INFERRED: <id>**".
- **Mathematical expressions and standalone variables must appear inside LaTeX dollar delimiters ($...$).**
- If a diagram/graph is omitted, write **[Graph omitted]**.
- If handwriting is unreadable: **[illegible]**.
**ANSWER-INTERPRETATION RULES:**
- If the student writes β€œNA”, β€œN/A”, β€œNot Applicable”, or clear equivalents β†’ record exactly as **NA**.
- If the student leaves the space blank, crosses it out, makes no meaningful attempt, or provides no answer β†’ record **[No response]**.
Ensure deterministic formatting so subsequent models can grade directly from this aligned format.
Expected questions (if missing, write NA):
{ids_block}
-----------------------
OUTPUT FORMAT:
<think>...</think>
Question <id>
AS:<transcribed answer or placeholder>
<think>...</think>
Question <id>
AS:<transcribed answer or placeholder>
...
==== GRAPH FOUND ANSWERS ====
Graph found in:
- Answer <number> β†’ Page <number>
(one per line)
==== END GRAPH FOUND ===="""
return prompt
def extract_graph_questions_from_ms(text: str):
"""Extract graph questions and page numbers from MS transcript."""
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
match = re.search(r"==== GRAPH EXPECTED QUESTIONS ====\s*(.*?)\s*==== END GRAPH EXPECTED ====",
clean_text, re.S)
graph_dict = {}
if match:
block = match.group(1)
for line in block.splitlines():
line = line.strip()
if line.startswith("- Question"):
q_match = re.match(r"- Question\s+([\dA-Za-z.()]+)\s*β†’\s*Page\s*(\d+)", line)
if q_match:
q_id, page = q_match.groups()
graph_dict[q_id] = int(page)
return graph_dict
def extract_graph_answers_from_as(text: str):
"""Extract graph answers and page numbers from AS transcript."""
clean_text = text.replace("\u00A0", " ").replace("\t", " ")
block = re.search(r"==== GRAPH FOUND ANSWERS ====\s*(.*?)\s*==== END GRAPH FOUND ====",
clean_text, re.S)
graph_dict = {}
if block:
for line in block.group(1).splitlines():
line = line.strip()
if line.startswith("- Answer"):
match = re.match(r"- Answer\s+([\dA-Za-z.()]+)\s*β†’\s*Page\s*(\d+)", line)
if match:
ans_id, page = match.groups()
graph_dict[ans_id] = int(page)
return graph_dict
def extract_marks_from_grading(grading_text):
"""
Parse the grading markdown and extract marks per question from the Awarded column only.
"""
print("πŸ”Ž Extracting awarded marks from grading output...")
grading_json = {"grading": []}
question_blocks = re.split(r"###\s*Question\s+", grading_text)
for block in question_blocks[1:]:
first_line = block.strip().splitlines()[0].strip() if block.strip().splitlines() else ""
q_id_match = re.match(r"([0-9]+(?:[a-zA-Z]|\([^)]+\)|(?:\.[a-zA-Z0-9]+))*)", first_line)
if not q_id_match:
q_id = first_line.split()[0] if first_line else ""
else:
q_id = q_id_match.group(1).strip()
# Extract marks only from the "Awarded" column (4th column in the table)
awarded = []
lines = block.split('\n')
for line in lines:
if '|' in line:
parts = [p.strip() for p in line.split('|')]
# Check if this is a data row (not header or separator) and has at least 5 columns
if len(parts) >= 5 and not parts[1].startswith('-'):
awarded_col = parts[4] # 4th column (index 4 because of leading empty from split)
# Extract mark codes from the awarded column
marks = re.findall(r"\b([MABCR]\d+|[MABCR]0)\b", awarded_col)
awarded.extend(marks)
grading_json["grading"].append({
"question": q_id,
"marks_awarded": awarded
})
print("βœ… Extracted grading marks for", len(grading_json["grading"]), "question blocks.")
print(json.dumps(grading_json, indent=2))
return grading_json
def check_and_correct_total_marks(grading_text):
"""
Verifies the total marks in the Examiner's Summary Report against
the sum of individual question marks. Corrects if discrepancy found.
Args:
grading_text (str): The full grading markdown text
Returns:
tuple: (corrected_text, calculated_awarded, calculated_possible, was_corrected)
"""
print("\n" + "="*60)
print("πŸ” VERIFYING TOTAL MARKS IN SUMMARY REPORT")
print("="*60)
question_marks = {}
calculated_total_awarded = 0
calculated_total_possible = 0
# Updated pattern to match BOTH formats:
# ### Question <1.a> (with angle brackets)
# ### Question 1.a (without angle brackets)
# The <? makes the opening bracket optional
# The >? makes the closing bracket optional
question_block_pattern = re.compile(
r"### Question\s*<?([0-9]+(?:[.()][a-z0-9]+)*)>?\s*[\s\S]*?\*\*Total:\s*(\d+)/(\d+)\*\*",
re.DOTALL | re.IGNORECASE
)
matches = question_block_pattern.finditer(grading_text)
for match in matches:
question_id = match.group(1).strip()
awarded = int(match.group(2))
possible = int(match.group(3))
question_marks[question_id] = {'awarded': awarded, 'possible': possible}
calculated_total_awarded += awarded
calculated_total_possible += possible
print(f"\nοΏ½ Exltracted marks from {len(question_marks)} questions:")
for q_id, marks in question_marks.items():
print(f" Question {q_id}: {marks['awarded']}/{marks['possible']}")
print(f"\nπŸ“ˆ Calculated totals from individual questions:")
print(f" Awarded: {calculated_total_awarded}")
print(f" Possible: {calculated_total_possible}")
# Find the summary report section
summary_report_start = grading_text.find("### Examiner's Summary Report")
if summary_report_start == -1:
print("⚠️ Warning: Could not find '### Examiner's Summary Report' section.")
return grading_text, calculated_total_awarded, calculated_total_possible, False
summary_section = grading_text[summary_report_start:]
summary_total_pattern = re.compile(r"(\*\*Total:\s*)(\d+)/(\d+)(\*\*)")
summary_match = summary_total_pattern.search(summary_section)
original_summary_awarded = 0
original_summary_possible = 0
if summary_match:
original_summary_awarded = int(summary_match.group(2))
original_summary_possible = int(summary_match.group(3))
print(f"\nπŸ“‹ Original summary report total: {original_summary_awarded}/{original_summary_possible}")
else:
print("⚠️ Warning: Could not find overall total in summary report.")
return grading_text, calculated_total_awarded, calculated_total_possible, False
# Check for discrepancies
corrected_report_text = grading_text
total_mismatch = False
if calculated_total_awarded != original_summary_awarded:
print(f"\n❌ DISCREPANCY FOUND in awarded marks!")
print(f" Calculated: {calculated_total_awarded}")
print(f" Reported: {original_summary_awarded}")
total_mismatch = True
if calculated_total_possible != original_summary_possible:
print(f"\n❌ DISCREPANCY FOUND in possible marks!")
print(f" Calculated: {calculated_total_possible}")
print(f" Reported: {original_summary_possible}")
total_mismatch = True
if total_mismatch:
print(f"\nπŸ”§ CORRECTING summary total:")
print(f" FROM: {original_summary_awarded}/{original_summary_possible}")
print(f" TO: {calculated_total_awarded}/{calculated_total_possible}")
# Correct only in the summary section
corrected_summary_section = re.sub(
summary_total_pattern,
rf"\g<1>{calculated_total_awarded}/{calculated_total_possible}\g<4>",
summary_section,
count=1
)
corrected_report_text = grading_text[:summary_report_start] + corrected_summary_section
print("βœ… Total marks corrected successfully!")
else:
print("\nβœ… Total marks are CORRECT - no correction needed!")
print("="*60 + "\n")
return corrected_report_text, calculated_total_awarded, calculated_total_possible, total_mismatch
# ---------------- MAPPING/IMPRINT HELPERS ----------------
def ask_gemini_for_mapping_batch(image_paths, grading_json, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
"""
Send multiple page images together to Gemini for batch mapping processing.
"""
ids_block = "{NA}"
if expected_ids:
ids_block = "{\n" + "\n".join(expected_ids) + "\n}"
prompt = f"""You are an exam marker. Your role is to identify where each question begins on each page.
The pages are divided into a {rows} x {cols} grid. Each cell has a RUNNING NUMBER label.
For each question in the grading JSON, return the cell NUMBER where the FIRST STEP of that question begins.
⚠ IMPORTANT RULES:
- Do not place marks inside another question's answer area.
- Prefer placing the marks in a BLANK cell immediately to the RIGHT of the answer step. If no blank cell is available to the right, then place in a blank cell to the LEFT.
- Never place marks above or below the answer.
- Each question should have unique cell number
- If a question serial number is visible in the answer image, you must mandatorily identify the corresponding question using the grading JSON.
IMPORTANT: For your help i have provided u questions that u can expect in the images:
{ids_block}
Return JSON only, like:
[{{"page": 1, "question": "1(a)", "cell_number": 15}}, ...]
Grading JSON:
{json.dumps(grading_json, indent=2)}"""
images = [Image.open(p) for p in image_paths]
print(f"πŸ“‘ Sending batch mapping request for {len(image_paths)} pages to Gemini...")
try:
contents = [prompt] + images
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=contents
)
raw_text = response.text
except:
print("⚠️ Trying fallback model for mapping...")
contents = [prompt] + images
response = client.models.generate_content(
model="gemini-2.5-flash-preview-09-2025",
contents=contents
)
raw_text = response.text
print("πŸ“₯ Batch mapping response (chars):", len(raw_text))
print("πŸ”Ž Gemini raw batch output:")
print(raw_text)
try:
match = re.search(r'(\[.*\])', raw_text, re.DOTALL)
if match:
mapping = json.loads(match.group(1))
print(f"βœ… Parsed Gemini batch mapping for {len(image_paths)} pages")
return mapping
else:
print("❌ Failed to find JSON array in response")
return []
except Exception as e:
print(f"❌ Failed to parse Gemini JSON mapping: {e}")
return []
def normalize_question_id(qid):
"""
Normalize question ID to a standard format for matching.
Converts formats like:
- "1(a)" -> "1.a"
- "2(c).i" -> "2.c.i"
- "3.d.ii" -> "3.d.ii" (already normalized)
"""
if not qid:
return qid
# Replace parentheses format: 1(a) -> 1.a
qid = re.sub(r'(\d+)\(([a-zA-Z])\)', r'\1.\2', qid)
# Replace format like 2(c).i -> 2.c.i
qid = re.sub(r'(\d+)\(([a-zA-Z]+)\)\.', r'\1.\2.', qid)
return qid
def imprint_marks_using_mapping(pdf_path, grading_json, output_pdf, expected_ids=None, rows=GRID_ROWS, cols=GRID_COLS):
"""
Convert PDF to images, create grid-numbered images for batch sending to Gemini,
then annotate and produce imprinted PDF.
"""
print("πŸ“„ Converting answer PDF to images for imprinting...")
pages = convert_from_path(pdf_path, dpi=100)
annotated_page_paths = []
temp_grid_images = []
for p_index, page in enumerate(pages):
img = page.convert("RGB")
w, h = img.size
cell_w, cell_h = w / cols, h / rows
draw = ImageDraw.Draw(img)
try:
num_font = ImageFont.truetype("arial.ttf", 20)
except Exception:
num_font = ImageFont.load_default()
cell_num = 1
for r in range(rows):
for c in range(cols):
x = int(c * cell_w + cell_w / 2)
y = int(r * cell_h + cell_h / 2)
text = str(cell_num)
bbox = draw.textbbox((0, 0), text, font=num_font)
tw = bbox[2] - bbox[0]
th = bbox[3] - bbox[1]
draw.text((x - tw/2, y - th/2), text, fill="black", font=num_font)
cell_num += 1
temp_path = f"page_{p_index+1}_grid.png"
img.save(temp_path, "PNG")
temp_grid_images.append(temp_path)
print("πŸ›° Created grid image:", temp_path)
print("πŸ“‘ Sending page images to Gemini in batches for mapping...")
batch_size = 10
all_mappings = []
for start in range(0, len(temp_grid_images), batch_size):
batch_paths = temp_grid_images[start:start+batch_size]
batch_mapping = ask_gemini_for_mapping_batch(batch_paths, grading_json, expected_ids, rows, cols)
all_mappings.extend(batch_mapping)
print(f"βœ… Processed batch {start//batch_size + 1}: pages {start+1}-{start+len(batch_paths)}")
print("πŸ–Š Annotating pages with marks...")
for p_index, page in enumerate(pages):
page_num = p_index + 1
page_img = page.convert("RGB")
img_cv = np.array(page_img)
img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
h, w, _ = img_cv.shape
cell_w_px, cell_h_px = w / cols, h / rows
page_mappings = [m for m in all_mappings if m.get("page") == page_num]
for item in page_mappings:
qid = item.get("question")
cell_number = item.get("cell_number")
if qid is None or cell_number is None:
continue
# Normalize the question ID from Gemini mapping
normalized_qid = normalize_question_id(qid)
# Try exact match first with normalized ID
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
if g["question"] == normalized_qid), [])
# If no match, try case-insensitive match
if not marks_list:
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
if g["question"].lower() == normalized_qid.lower()), [])
# If still no match, try with original qid
if not marks_list:
marks_list = next((g["marks_awarded"] for g in grading_json.get("grading", [])
if g["question"] == qid), [])
marks_text = ",".join(marks_list) if marks_list else "?"
if marks_text == "?":
print(f"⚠️ No marks found for question '{qid}' (normalized: '{normalized_qid}') on page {page_num}")
row = (cell_number - 1) // cols
col = (cell_number - 1) % cols
x_c = int((col + 1) * cell_w_px - cell_w_px / 4)
y_c = int((row + 0.5) * cell_h_px)
font_scale = max(1.0, min(2.0, cell_h_px / 40.0))
thickness = max(2, int(font_scale * 2))
cv2.putText(img_cv, marks_text, (x_c, y_c), cv2.FONT_HERSHEY_SIMPLEX,
font_scale, (0, 0, 255), thickness, cv2.LINE_AA)
print(f"πŸ–Š Marks annotated for page {page_num}, question {qid}: {marks_text}")
annotated_path = f"annotated_page_{page_num}.png"
cv2.imwrite(annotated_path, img_cv)
annotated_page_paths.append(annotated_path)
print("βœ… Annotated page saved:", annotated_path)
print("πŸ“‘ Merging annotated pages into final PDF...")
with open(output_pdf, "wb") as f:
f.write(img2pdf.convert(annotated_page_paths))
compressed = compress_pdf(output_pdf)
print("πŸ“‘ Imprinted PDF saved to:", compressed)
return compressed
def extract_pdf_pages_as_images(pdf_path, page_numbers, prefix):
"""
Extracts unique pages (1-based) from a PDF as images, saves as PNG, returns list of file paths.
Handles cases where requested pages don't exist in the PDF.
"""
if not page_numbers:
print(f"⚠️ No page numbers provided for extraction")
return []
unique_pages = sorted(set(page_numbers))
# First, get the total page count to validate requested pages
try:
from PyPDF2 import PdfReader
reader = PdfReader(pdf_path)
total_pages = len(reader.pages)
print(f"πŸ“„ PDF has {total_pages} total pages")
# Filter out invalid page numbers
valid_pages = [p for p in unique_pages if 1 <= p <= total_pages]
invalid_pages = [p for p in unique_pages if p not in valid_pages]
if invalid_pages:
print(f"⚠️ Skipping invalid page numbers (out of range): {invalid_pages}")
if not valid_pages:
print(f"❌ No valid pages to extract from {pdf_path}")
return []
unique_pages = valid_pages
except Exception as e:
print(f"⚠️ Could not validate page numbers: {e}. Proceeding with extraction...")
# Extract the pages
try:
images = convert_from_path(pdf_path, dpi=200, first_page=min(unique_pages), last_page=max(unique_pages))
except Exception as e:
print(f"❌ Failed to convert PDF pages to images: {e}")
return []
out_paths = []
for idx, page_num in enumerate(unique_pages):
img_idx = page_num - min(unique_pages)
# Bounds check to prevent index errors
if img_idx >= len(images):
print(f"⚠️ Page {page_num} not found in extracted images (index {img_idx} >= {len(images)}). Skipping...")
continue
try:
img = images[img_idx]
out_path = f"{prefix}_page_{page_num}.png"
img.save(out_path, "PNG")
print(f"πŸ“€ Extracted graph page {page_num} from {pdf_path} as {out_path}")
out_paths.append(out_path)
except Exception as e:
print(f"❌ Failed to save page {page_num}: {e}")
continue
return out_paths
# ---------------- PIPELINE ----------------
def align_and_grade_pipeline(qp_path, ms_path, ans_path, subject="Maths", imprint=False, run_timestamp=None):
"""
Final pipeline with graph-aware grading logic using NEW SDK.
Args:
qp_path: Path to Question Paper PDF
ms_path: Path to Markscheme PDF
ans_path: Path to Answer Sheet PDF
subject: Subject name (Maths or Science)
imprint: Whether to generate imprinted PDF
run_timestamp: Unix timestamp for organizing files in Supabase
"""
try:
print("πŸ” Starting pipeline...")
qp_path = compress_pdf(qp_path)
ms_path = compress_pdf(ms_path)
ans_path = compress_pdf(ans_path)
merged_qpms_path = os.path.splitext(qp_path)[0] + "_merged_qp_ms.pdf"
merge_pdfs([qp_path, ms_path], merged_qpms_path)
print("πŸ“Ž Merged QP + MS ->", merged_qpms_path)
print("πŸ”Ό Uploading files to Gemini...")
merged_uploaded = upload_to_gemini(merged_qpms_path)
ans_uploaded = upload_to_gemini(ans_path)
print("βœ… Upload complete.")
print("1.i) Transcribing QP+MS (questions first, then full markscheme, with graph detection)...")
qpms_prompt = QP_MS_TRANSCRIPTION_PROMPT["content"] + "\nAt the end, also list all questions in the markscheme where a graph is expected, in the format:\nGraph expected in:\n- Question <number> β†’ Page <number>\n(One per line, after ==== MARKSCHEME END ====)"
qpms_text = gemini_generate_content(qpms_prompt, file_upload_obj=merged_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=merged_qpms_path)
print("πŸ“„ QP+MS transcription received. Saving debug file: debug_qpms_transcript.txt")
with open("debug_qpms_transcript.txt", "w", encoding="utf-8") as f:
f.write(qpms_text)
ms_graph_mapping = extract_graph_questions_from_ms(qpms_text)
print("πŸ–ΌοΈ Graph-expected questions in MS:", ms_graph_mapping)
ms_graph_pages = list(ms_graph_mapping.values())
ms_graph_images = []
if ms_graph_pages:
ms_graph_images = extract_pdf_pages_as_images(merged_qpms_path, ms_graph_pages, prefix="qpms_graph")
extracted_ids = extract_question_ids_from_qpms(qpms_text)
if not extracted_ids:
extracted_ids = ["NA"]
print("1.ii) Building AS transcription prompt with expected question IDs and graph detection, sending to Gemini...")
as_prompt = build_as_cot_prompt_with_expected_ids(extracted_ids, qpms_text) + "\nAt the end, also list all answers where a graph is found, in the format:\nGraph found in:\n- Answer <number> β†’ Page <number>\n(One per line, after all answers)"
as_text = gemini_generate_content(as_prompt, file_upload_obj=ans_uploaded, model_name="gemini-2.5-flash", fallback_model="gemini-2.5-flash-preview-09-2025", fallback_model_2="gemini-2.5-flash-lite", file_path=ans_path)
print("πŸ“ AS transcription received. Saving debug file: debug_as_transcript.txt")
with open("debug_as_transcript.txt", "w", encoding="utf-8") as f:
f.write(as_text)
as_graph_mapping = extract_graph_answers_from_as(as_text)
print("πŸ–ΌοΈ Graph-attempted answers in AS:", as_graph_mapping)
as_graph_pages = list(as_graph_mapping.values())
as_graph_images = []
if as_graph_pages:
as_graph_images = extract_pdf_pages_as_images(ans_path, as_graph_pages, prefix="as_graph")
print("2) Preparing grading input and sending to Gemini for grading...")
grading_input = (
"=== QP+MS TRANSCRIPT BEGIN ===\n"
+ qpms_text
+ "\n=== QP+MS TRANSCRIPT END ===\n\n"
+ "=== ANSWER SHEET TRANSCRIPT BEGIN ===\n"
+ as_text
+ "\n=== ANSWER SHEET TRANSCRIPT END ===\n"
)
if ms_graph_images or as_graph_images:
graph_note = "\n\n---\nSome questions require graphs. I've attached the relevant graph pages from QP+MS and from the Answer Sheet. Use them as visual context when grading.\n---\n"
grading_input += graph_note
grading_prompt_obj = get_grading_prompt(subject.lower())
grading_prompt_system = grading_prompt_obj["content"]
grading_images = ms_graph_images + as_graph_images
grading_text = gemini_generate_content(grading_prompt_system + "\n\nPlease grade the following transcripts:\n" + grading_input, image_obj=grading_images if grading_images else None, model_name="gemini-2.5-pro", fallback_model="gemini-2.5-flash")
print("🧾 Grading output received. Saving debug file: debug_grading.md")
with open("debug_grading.md", "w", encoding="utf-8") as f:
f.write(grading_text)
# Verify and correct total marks if needed
grading_text, calc_awarded, calc_possible, was_corrected = check_and_correct_total_marks(grading_text)
if was_corrected:
print("πŸ“ Saving corrected grading to debug file: debug_grading_corrected.md")
with open("debug_grading_corrected.md", "w", encoding="utf-8") as f:
f.write(grading_text)
base_name = os.path.splitext(os.path.basename(ans_path))[0]
grading_pdf_path = save_as_pdf(grading_text, f"{base_name}_graded.pdf")
print("πŸ“„ Grading PDF saved:", grading_pdf_path)
grading_json = extract_marks_from_grading(grading_text)
with open("debug_grading_json.json", "w", encoding="utf-8") as f:
json.dump(grading_json, f, indent=2, ensure_ascii=False)
print("πŸ”§ Grading marks extraction complete.")
imprinted_pdf_path = None
if imprint:
print("✍ Imprint option enabled. Starting imprinting process...")
imprinted_pdf_path = f"{base_name}_imprinted.pdf"
imprinted_pdf_path = imprint_marks_using_mapping(ans_path, grading_json, imprinted_pdf_path, extracted_ids)
print("βœ… Imprinting finished. Imprinted PDF at:", imprinted_pdf_path)
# Upload output files to Supabase (using same timestamp as input files)
output_urls = {
"graded_pdf_url": None,
"imprinted_pdf_url": None
}
if supabase_client:
print("\nπŸ“€ Uploading output files to Supabase...")
if grading_pdf_path:
output_urls["graded_pdf_url"] = upload_file_to_supabase(grading_pdf_path, "graded", run_timestamp)
if imprinted_pdf_path:
output_urls["imprinted_pdf_url"] = upload_file_to_supabase(imprinted_pdf_path, "imprinted", run_timestamp)
print("🏁 Pipeline finished successfully.")
return qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls
except Exception as e:
print("❌ Pipeline error:", e)
import traceback
traceback.print_exc()
return f"❌ Error: {e}", None, None, None, None, {}
# ---------------- GRADIO UI ----------------
with gr.Blocks(title="AI Grading (Pandoc + pdflatex)") as demo:
gr.Markdown("## πŸ“˜ AI Grading β€” Using Pandoc + pdflatex for PDF Generation")
gr.Markdown("**βœ… Now using Pandoc with pdflatex for professional-quality PDF outputs!**")
if supabase_client:
gr.Markdown("**☁️ Supabase Storage: Enabled** - All files will be uploaded to cloud storage")
else:
gr.Markdown("**⚠️ Supabase Storage: Disabled** - Files will only be processed locally")
with gr.Row():
qp_file = gr.File(label="πŸ“„ Upload Question Paper (PDF)")
ms_file = gr.File(label="πŸ“„ Upload Markscheme (PDF)")
ans_file = gr.File(label="πŸ“ Upload Student Answer Sheet (PDF)")
with gr.Row():
subject_dropdown = gr.Dropdown(
choices=["Maths", "Science", "Economics"],
value="Maths",
label="πŸ“š Subject",
info="Select the subject to apply appropriate grading guidelines"
)
imprint_toggle = gr.Checkbox(label="✍ Imprint Marks on Student Answer Sheet", value=False)
run_button = gr.Button("πŸš€ Run Pipeline")
# File URLs section (only shown if Supabase is enabled)
if supabase_client:
with gr.Accordion("☁️ Uploaded File URLs", open=False):
file_urls_box = gr.Textbox(label="Cloud Storage URLs", lines=8, interactive=False)
with gr.Row():
qpms_box = gr.Textbox(label="πŸ“‘ QP+MS Transcript", lines=12)
as_box = gr.Textbox(label="πŸ“ AS Transcript", lines=12)
grading_output_box = gr.Textbox(label="🧾 Grading (Markdown)", lines=20)
grading_pdf_file = gr.File(label="πŸ“₯ Download Grading PDF")
imprint_pdf_file = gr.File(label="πŸ“₯ Download Imprinted PDF (Optional)")
def run_pipeline(qp_file_obj, ms_file_obj, ans_file_obj, subject_choice, imprint_flag):
if not qp_file_obj or not ms_file_obj or not ans_file_obj:
error_msg = "❌ Please upload all three files"
if supabase_client:
return error_msg, "", "", None, None, ""
else:
return error_msg, "", "", None, None
# Process and upload input files (generates shared timestamp)
qp_path, ms_path, ans_path, input_urls, run_timestamp = process_and_upload_input_files(
qp_file_obj, ms_file_obj, ans_file_obj
)
# Run the grading pipeline (pass timestamp to keep all files together)
qpms_text, as_text, grading_text, grading_pdf_path, imprinted_pdf_path, output_urls = align_and_grade_pipeline(
qp_path, ms_path, ans_path, subject=subject_choice, imprint=imprint_flag, run_timestamp=run_timestamp
)
# Build URLs summary
urls_summary = ""
if supabase_client:
urls_summary = f"πŸ“€ UPLOADED FILES (Timestamp: {run_timestamp}):\n\n"
urls_summary += "INPUT FILES:\n"
if input_urls.get("qp_url"):
urls_summary += f"β€’ Question Paper: {input_urls['qp_url']}\n"
if input_urls.get("ms_url"):
urls_summary += f"β€’ Markscheme: {input_urls['ms_url']}\n"
if input_urls.get("ans_url"):
urls_summary += f"β€’ Answer Sheet: {input_urls['ans_url']}\n"
urls_summary += "\nOUTPUT FILES:\n"
if output_urls.get("graded_pdf_url"):
urls_summary += f"β€’ Graded PDF: {output_urls['graded_pdf_url']}\n"
if output_urls.get("imprinted_pdf_url"):
urls_summary += f"β€’ Imprinted PDF: {output_urls['imprinted_pdf_url']}\n"
urls_summary += f"\nπŸ“ All files stored in: examfiles/{run_timestamp}/\n"
if not any(input_urls.values()) and not any(output_urls.values()):
urls_summary += "\n⚠️ No files were uploaded to Supabase"
if supabase_client:
return (
qpms_text or "",
as_text or "",
grading_text or "",
grading_pdf_path,
imprinted_pdf_path,
urls_summary
)
else:
return (
qpms_text or "",
as_text or "",
grading_text or "",
grading_pdf_path,
imprinted_pdf_path
)
# Set up the click handler based on whether Supabase is enabled
if supabase_client:
run_button.click(
fn=run_pipeline,
inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle],
outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file, file_urls_box]
)
else:
run_button.click(
fn=run_pipeline,
inputs=[qp_file, ms_file, ans_file, subject_dropdown, imprint_toggle],
outputs=[qpms_box, as_box, grading_output_box, grading_pdf_file, imprint_pdf_file]
)
if __name__ == "__main__":
demo.launch()