Spaces:
Sleeping
Sleeping
import numpy as np | |
import cv2 | |
from PIL import Image, ImageEnhance | |
from io import BytesIO | |
from pdf2image import convert_from_path | |
import json | |
from gapi_client import get_genai_client | |
from utils import extract_json_from_output | |
# Global GenAI client | |
CLIENT = None | |
def init_genai(): | |
""" | |
Initialize the global GenAI client with the provided API key. | |
""" | |
global CLIENT | |
CLIENT = get_genai_client() | |
def parse_all_answers(image_input: Image.Image) -> str: | |
""" | |
Extracts answers from a full answer-sheet image using Gemini. | |
Returns the raw JSON string from the model. | |
""" | |
output_format = ''' | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Paper name": {"name": "<paper Alphabet>"}, | |
"Answers": { | |
"1": "<option or text>", | |
"2": "<option or text>", | |
"3": "<option or text>", | |
"4": "<option or text>", | |
"5": "<option or text>", | |
"6": "<option or text>", | |
"7": "<option or text>", | |
"8": "<option or text>", | |
"9": "<option or text>", | |
"10": "<option or text>", | |
"11": "<option or text>", | |
"12": "<option or text>", | |
"13": "<option or text>", | |
"14": "<option or text>", | |
"15": "<option or text>", | |
"16": "<option or text>", | |
"17": "<option or text>", | |
"18": "<option or text>", | |
"19": "<option or text>", | |
"20": "<option or text>", | |
"21": "<free text answer>", | |
"22": "<free text answer>", | |
"23": "<free text answer>", | |
"24": "<free text answer>", | |
"25": "<free text answer>" | |
} | |
} | |
''' | |
prompt = f""" | |
You are an assistant that extracts answers from an image. | |
Write only the Alphabet(A,B,C,D,E,F) of the paper in the \"Paper name\" field. | |
The image is a screenshot of an answer sheet containing 25 questions. | |
For questions 1 to 20, the answers are multiple-choice selections. | |
For questions 21 to 25, the answers are free-text responses. | |
Extract the answer for each question (1 to 25) and provide the result in JSON using the format below: | |
{output_format} | |
""" | |
response = CLIENT.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, image_input] | |
) | |
return response.text | |
def preprocess_pdf_last_page(image: Image.Image) -> Image.Image: | |
""" | |
Preprocesses the last page PIL image: | |
- Convert to OpenCV BGR | |
- Mask vertical region | |
- Crop to mask | |
- Unsharp mask sharpen | |
- Enhance with PIL | |
""" | |
# Convert to BGR | |
img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
h, w = img_cv.shape[:2] | |
# Mask | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bottom = int(h * 0.14), int(h * 0.73) | |
cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1) | |
masked = cv2.bitwise_and(img_cv, img_cv, mask=mask) | |
# Crop | |
coords = cv2.findNonZero(mask) | |
x, y, cw, ch = cv2.boundingRect(coords) | |
cropped = masked[y:y+ch, x:x+cw] | |
# Sharpen | |
blurred = cv2.GaussianBlur(cropped, (0, 0), sigmaX=3) | |
sharpened = cv2.addWeighted(cropped, 1.5, blurred, -0.5, 0) | |
# PIL enhancements | |
pil2 = Image.fromarray(cv2.cvtColor(sharpened, cv2.COLOR_BGR2RGB)) | |
pil2 = ImageEnhance.Sharpness(pil2).enhance(1.3) | |
pil2 = ImageEnhance.Contrast(pil2).enhance(1.4) | |
pil2 = ImageEnhance.Brightness(pil2).enhance(1.1) | |
return pil2 | |
def parse_info_with_gemini(pil_img: Image.Image) -> dict: | |
""" | |
Calls Gemini on a header image to extract candidate info fields. | |
""" | |
output_format = ''' | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Candidate Info": { | |
"Paper": "<paper>", | |
"Level": "<level>", | |
"Candidate Name": "<name>", | |
"Candidate Number": "<number>", | |
"School": "<school>", | |
"Country": "<country>", | |
"grade level": "<grade level>", | |
"Date": "<date>" | |
} | |
} | |
''' | |
prompt = f""" | |
You are a helper that accurately reads a sharpened exam header image and extracts exactly these fields: | |
β’ Paper (e.g. \"B\") | |
β’ Level (e.g. \"MIDDLE PRIMARY\") | |
β’ Candidate Name | |
β’ Candidate Number | |
β’ School | |
β’ Country | |
β’ grade level | |
β’ Date (with time) | |
Return **only** valid JSON in this format: | |
{output_format} | |
""" | |
response = CLIENT.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, pil_img] | |
) | |
return extract_json_from_output(response.text) | |
def extract_candidate_data(image: Image.Image) -> dict: | |
""" | |
Preprocess last page and parse candidate info. | |
""" | |
prepped = preprocess_pdf_last_page(image) | |
info = parse_info_with_gemini(prepped) | |
return info | |
def parse_mcq_answers(pil_image: Image.Image) -> str: | |
""" | |
Extracts MCQ answers 1β10 from an image. | |
""" | |
output_format = ''' | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Answers": { | |
"1": "<option>", | |
"2": "<option>", | |
"3": "<option>", | |
"4": "<option>", | |
"5": "<option>", | |
"6": "<option>", | |
"7": "<option>", | |
"8": "<option>", | |
"9": "<option>", | |
"10": "<option>" | |
} | |
} | |
''' | |
prompt = f""" | |
You are an assistant that extracts MCQ answers from an image. | |
The image is a screenshot of a 10-question multiple-choice answer sheet. | |
Extract which option is marked for each question (1β10) and provide the answers in JSON: | |
{output_format} | |
""" | |
response = CLIENT.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, pil_image] | |
) | |
return response.text | |
def get_mcqs1st(pil_image: Image.Image) -> dict: | |
""" | |
Mask, crop, enhance, and parse MCQs 1β10. | |
""" | |
img_cv = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
h, w = img_cv.shape[:2] | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bot, right = int(h*0.30), int(h*0.44), int(w*0.35) | |
cv2.rectangle(mask, (0, top), (right, h-bot), 255, -1) | |
masked = cv2.bitwise_and(img_cv, img_cv, mask=mask) | |
coords = cv2.findNonZero(mask) | |
x, y, cw, ch = cv2.boundingRect(coords) | |
cropped = masked[y:y+ch, x:x+cw] | |
blur = cv2.GaussianBlur(cropped, (0,0), sigmaX=3) | |
sharp = cv2.addWeighted(cropped, 1.5, blur, -0.5, 0) | |
pil_sh = Image.fromarray(cv2.cvtColor(sharp, cv2.COLOR_BGR2RGB)) | |
pil_sh = ImageEnhance.Sharpness(pil_sh).enhance(1.3) | |
pil_sh = ImageEnhance.Contrast(pil_sh).enhance(1.4) | |
final = ImageEnhance.Brightness(pil_sh).enhance(1.1) | |
raw = parse_mcq_answers(final) | |
return extract_json_from_output(raw) | |
def parse_mcq_answers_11_20(pil_image: Image.Image) -> str: | |
""" | |
Extracts MCQ answers 11β20 from an image. | |
""" | |
output_format = ''' | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Answers": { | |
"11": "<option>", | |
"12": "<option>", | |
"13": "<option>", | |
"14": "<option>", | |
"15": "<option>", | |
"16": "<option>", | |
"17": "<option>", | |
"18": "<option>", | |
"19": "<option>", | |
"20": "<option>" | |
} | |
} | |
''' | |
prompt = f""" | |
You are an assistant that extracts MCQ answers from an image. | |
The image is a screenshot of questions 11β20. | |
Extract the marked option for each and return JSON: | |
{output_format} | |
""" | |
response = CLIENT.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, pil_image] | |
) | |
return response.text | |
def get_mcqs2nd(pil_image: Image.Image) -> dict: | |
""" | |
Mask, crop, enhance, and parse MCQs 11β20. | |
""" | |
img_cv = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
h, w = img_cv.shape[:2] | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bottom, right = int(h*0.56), int(h*0.21), int(w*0.35) | |
cv2.rectangle(mask, (0, top), (right, h-bottom), 255, -1) | |
masked = cv2.bitwise_and(img_cv, img_cv, mask=mask) | |
coords = cv2.findNonZero(mask) | |
x, y, cw, ch = cv2.boundingRect(coords) | |
cropped = masked[y:y+ch, x:x+cw] | |
blurred = cv2.GaussianBlur(cropped, (0,0), sigmaX=3) | |
sharp = cv2.addWeighted(cropped, 1.5, blurred, -0.5, 0) | |
pil_sharp = Image.fromarray(cv2.cvtColor(sharp, cv2.COLOR_BGR2RGB)) | |
pil_sharp = ImageEnhance.Sharpness(pil_sharp).enhance(1.3) | |
pil_sharp = ImageEnhance.Contrast(pil_sharp).enhance(1.4) | |
final_pil = ImageEnhance.Brightness(pil_sharp).enhance(1.1) | |
raw = parse_mcq_answers_11_20(final_pil) | |
return extract_json_from_output(raw) | |
def parse_text_answers(pil_image: Image.Image) -> str: | |
""" | |
Extracts free-text answers 21β25 from an image. | |
""" | |
output_format = ''' | |
Answer in the following JSON format. Do not write anything else: | |
{ | |
"Answers": { | |
"21": "<text>", | |
"22": "<text>", | |
"23": "<text>", | |
"24": "<text>", | |
"25": "<text>" | |
} | |
} | |
''' | |
prompt = f""" | |
You are an assistant that extracts free-text answers from an image. | |
The image shows answers to questions 21β25. | |
Extract the text for each and return JSON: | |
{output_format} | |
""" | |
response = CLIENT.models.generate_content( | |
model="gemini-2.0-flash", | |
contents=[prompt, pil_image] | |
) | |
return response.text | |
def get_answer(pil_image: Image.Image) -> dict: | |
""" | |
Mask, crop, enhance, and parse free-text 21β25. | |
""" | |
img_cv = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
h, w = img_cv.shape[:2] | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bottom = int(h*0.31), int(h*0.31) | |
left, right = int(w*0.35), int(w*0.66) | |
cv2.rectangle(mask, (left, top), (right, h-bottom), 255, -1) | |
masked = cv2.bitwise_and(img_cv, img_cv, mask=mask) | |
coords = cv2.findNonZero(mask) | |
x, y, cw, ch = cv2.boundingRect(coords) | |
cropped = masked[y:y+ch, x:x+cw] | |
blurred = cv2.GaussianBlur(cropped, (0,0), sigmaX=3) | |
sharp = cv2.addWeighted(cropped, 1.5, blurred, -0.5, 0) | |
pil_sharp = Image.fromarray(cv2.cvtColor(sharp, cv2.COLOR_BGR2RGB)) | |
pil_sharp = ImageEnhance.Sharpness(pil_sharp).enhance(1.3) | |
pil_sharp = ImageEnhance.Contrast(pil_sharp).enhance(1.4) | |
final_pil = ImageEnhance.Brightness(pil_sharp).enhance(1.1) | |
raw = parse_text_answers(final_pil) | |
return extract_json_from_output(raw) | |
def infer_page(pil_image: Image.Image) -> dict: | |
""" | |
Full pipeline for a single exam page. | |
""" | |
student_info = extract_candidate_data(pil_image) | |
mcq1 = get_mcqs1st(pil_image) or {} | |
mcq2 = get_mcqs2nd(pil_image) or {} | |
free_txt = get_answer(pil_image) or {} | |
all_answers = {**mcq1.get("Answers", {}), **mcq2.get("Answers", {}), **free_txt.get("Answers", {})} | |
return {"Candidate Info": student_info.get("Candidate Info", {}), "Answers": all_answers} | |
def infer_all_pages(pdf_path: str) -> dict: | |
""" | |
Processes every page in the PDF and infers student data. | |
""" | |
results = {} | |
pages = convert_from_path(pdf_path) | |
for idx, page in enumerate(pages, start=1): | |
data = infer_page(page) | |
info = data.get("Candidate Info", {}) | |
key = info.get("Candidate Number") or f"Page_{idx}" | |
if data.get("Answers"): | |
results[key] = data | |
return results | |
def load_answer_key(pdf_path: str) -> dict: | |
""" | |
Parses the official answer-key PDF into a dict of paper->answers. | |
""" | |
images = convert_from_path(pdf_path) | |
key_dict = {} | |
for page in images: | |
raw = parse_all_answers(page) | |
parsed = extract_json_from_output(raw) | |
name = parsed.get("Paper name", {}).get("name") | |
key_dict[name] = parsed.get("Answers", {}) | |
return key_dict | |
def grade_page(student_page_data: dict, answer_key_dict: dict) -> dict: | |
""" | |
Grades a single student page against the loaded key. | |
""" | |
paper = student_page_data.get("Candidate Info", {}).get("Paper") | |
correct = answer_key_dict.get(paper, {}) | |
student_ans = student_page_data.get("Answers", {}) | |
total_q = len(correct) | |
correct_count = 0 | |
detailed = {} | |
for q, key_ans in correct.items(): | |
stud_ans = student_ans.get(q, "") | |
is_corr = str(stud_ans).strip().upper() == str(key_ans).strip().upper() | |
if is_corr: | |
correct_count += 1 | |
detailed[q] = {"Correct Answer": key_ans, "Student Answer": stud_ans, "Is Correct": is_corr} | |
percentage = round(correct_count/total_q*100, 2) if total_q else 0.0 | |
return {"Candidate Info": student_page_data.get("Candidate Info", {}), "Total Marks": correct_count, "Total Questions": total_q, "Percentage": percentage, "Detailed Results": detailed} | |
def grade_all_students(answer_key_pdf: str, student_pdf: str, out_json: str = "results.json") -> dict: | |
""" | |
Loads key, infers all students, grades them, and writes JSON. | |
""" | |
key_dict = load_answer_key(answer_key_pdf) | |
students = infer_all_pages(student_pdf) | |
results = {} | |
for cand, data in students.items(): | |
results[cand] = grade_page(data, key_dict) | |
with open(out_json, "w") as f: | |
json.dump(results, f, indent=2) | |
return results | |