Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import random | |
from datasets import load_dataset, Dataset | |
from typing import Dict, List | |
import re | |
import datetime | |
import pandas as pd | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
def sanitize_theme_name(theme: str) -> str: | |
sanitized = re.sub(r'[^\w\s-]', '', theme) | |
sanitized = re.sub(r'[-\s]+', '_', sanitized) | |
return sanitized.lower().strip('_') | |
def load_questions_from_dataset() -> Dict[str, List[Dict]]: | |
dataset = load_dataset("SASLeaderboard/sas_opposition_exam_data") | |
dataset = dataset['train'].filter(lambda x: x['theme'] == 'FEA Urología') | |
questions_by_theme = {} | |
skipped = 0 | |
loaded = 0 | |
for item in dataset: | |
theme = item['theme'] | |
answers = item.get('answers', []) | |
correct_answer = item.get('correct_answer', '') | |
if not answers or not correct_answer or len(answers) < 3: | |
skipped += 1 | |
continue | |
while len(answers) < 4: | |
answers.append(answers[-1]) | |
sanitized_theme = sanitize_theme_name(theme) | |
if sanitized_theme not in questions_by_theme: | |
questions_by_theme[sanitized_theme] = [] | |
try: | |
question = { | |
"statement": item['statement'], | |
"options": { | |
"A": answers[0], | |
"B": answers[1], | |
"C": answers[2], | |
"D": answers[3] | |
}, | |
"real_answer": correct_answer, | |
"theme": theme, | |
"sanitized_theme": sanitized_theme, | |
"version": item.get('version', 'Default') | |
} | |
questions_by_theme[sanitized_theme].append(question) | |
loaded += 1 | |
except Exception as e: | |
skipped += 1 | |
continue | |
print(f"Loaded {loaded} questions, skipped {skipped} invalid questions") | |
return questions_by_theme | |
def ask_ai_model(api_key: str, model: str, question: Dict) -> tuple: | |
prompt = f"""You are a medical expert taking a urology examination. Please analyze this question carefully and provide your answer. | |
Question: {question['statement']} | |
Options: | |
A) {question['options']['A']} | |
B) {question['options']['B']} | |
C) {question['options']['C']} | |
D) {question['options']['D']} | |
Please provide your answer in this exact format: | |
Answer: [A/B/C/D] | |
Then provide your reasoning.""" | |
try: | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": model, | |
"messages": [ | |
{"role": "user", "content": prompt} | |
] | |
} | |
response = requests.post("https://openrouter.ai/api/v1/chat/completions", | |
headers=headers, json=data) | |
if response.status_code == 200: | |
result = response.json() | |
ai_response = result["choices"][0]["message"]["content"] | |
ai_answer = extract_answer_from_response(ai_response) | |
return ai_response, ai_answer | |
else: | |
error_msg = f"API Error {response.status_code}: {response.text}" | |
return error_msg, "API_ERROR" | |
except Exception as e: | |
error_msg = f"Request Error: {str(e)}" | |
return error_msg, "REQUEST_ERROR" | |
def extract_answer_from_response(ai_response: str) -> str: | |
if not ai_response: | |
return "EMPTY_RESPONSE" | |
lines = ai_response.split('\n') | |
for line in lines: | |
line_clean = line.strip().lower() | |
if line_clean.startswith('answer:'): | |
answer_part = line.split(':')[1].strip().upper() | |
for char in answer_part: | |
if char in ['A', 'B', 'C', 'D']: | |
return char | |
for line in lines: | |
line_clean = line.strip().lower() | |
if 'answer is' in line_clean: | |
for char in ['A', 'B', 'C', 'D']: | |
if char.lower() in line_clean.split('answer is')[1][:5]: | |
return char | |
for line in lines[:5]: | |
line_upper = line.upper() | |
for char in ['A', 'B', 'C', 'D']: | |
patterns = [f"{char})", f"{char}.", f"OPTION {char}", f"({char})", f"CHOICE {char}"] | |
for pattern in patterns: | |
if pattern in line_upper: | |
return char | |
for line in lines[:3]: | |
for char in ['A', 'B', 'C', 'D']: | |
if char in line.upper(): | |
return char | |
for char in ['A', 'B', 'C', 'D']: | |
if char in ai_response.upper(): | |
return char | |
return "NO_ANSWER_FOUND" | |
def save_results_to_dataset(results: List[Dict], hf_token: str = None) -> str: | |
if not results: | |
return "No results to save" | |
if not hf_token: | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
return "❌ HuggingFace token not found. Please provide it in the interface or set HF_TOKEN environment variable" | |
try: | |
try: | |
existing_dataset = load_dataset("SASLeaderboard/results", use_auth_token=hf_token) | |
existing_data = existing_dataset['train'].to_pandas() | |
except Exception: | |
existing_data = None | |
new_data = pd.DataFrame(results) | |
if existing_data is not None: | |
combined_data = pd.concat([existing_data, new_data], ignore_index=True) | |
else: | |
combined_data = new_data | |
new_dataset = Dataset.from_pandas(combined_data) | |
new_dataset.push_to_hub( | |
"SASLeaderboard/results", | |
token=hf_token, | |
commit_message=f"Automated exam results for {results[0]['model']} - {len(results)} questions" | |
) | |
return f"✅ Successfully saved {len(results)} results to SASLeaderboard/results dataset" | |
except Exception as e: | |
return f"❌ Error saving results: {str(e)}" | |
def run_automated_exam(api_key: str, model: str, hf_token: str = ""): | |
if not api_key: | |
yield "❌ Please provide OpenRouter API key" | |
return | |
if not model: | |
yield "❌ Please provide model name" | |
return | |
yield "🔄 Loading questions from dataset..." | |
try: | |
all_questions_by_theme = load_questions_from_dataset() | |
all_questions = [] | |
for theme_questions in all_questions_by_theme.values(): | |
all_questions.extend(theme_questions) | |
total_questions = len(all_questions) | |
yield f"✅ Loaded {total_questions} questions from dataset" | |
yield f"🚀 Starting automated exam with ALL {total_questions} questions for model: {model}" | |
session_id = f"{model}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
results = [] | |
correct_count = 0 | |
for i, question in enumerate(all_questions): | |
ai_response, ai_answer = ask_ai_model(api_key, model, question) | |
if ai_answer in ["API_ERROR", "REQUEST_ERROR", "EMPTY_RESPONSE", "NO_ANSWER_FOUND"]: | |
yield f"⚠️ Question {i+1}: Error getting answer - {ai_answer}. Response: {ai_response[:100]}..." | |
is_correct = ai_answer == question['real_answer'] | |
if is_correct: | |
correct_count += 1 | |
result = { | |
"session_id": session_id, | |
"model": model, | |
"question": question['statement'], | |
"theme": question['theme'], | |
"correct_answer": question['real_answer'], | |
"ai_answer": ai_answer, | |
"ai_response": ai_response, | |
"is_correct": is_correct, | |
"timestamp": datetime.datetime.now().isoformat(), | |
"options_a": question['options']['A'], | |
"options_b": question['options']['B'], | |
"options_c": question['options']['C'], | |
"options_d": question['options']['D'] | |
} | |
results.append(result) | |
current_accuracy = (correct_count / (i + 1)) * 100 | |
status_emoji = "✅" if is_correct else "❌" | |
yield f"{status_emoji} Q{i+1}/{total_questions}: Accuracy: {correct_count}/{i+1} ({current_accuracy:.1f}%) | AI: {ai_answer} vs Correct: {question['real_answer']} | {question['statement'][:80]}..." | |
yield f"💾 Saving results to HuggingFace dataset..." | |
save_result = save_results_to_dataset(results, hf_token) | |
final_accuracy = (correct_count / len(results)) * 100 | |
yield f""" | |
## 🎯 Exam Complete! | |
**Final Results:** | |
- Model: {model} | |
- Total Questions: {len(results)} | |
- Correct Answers: {correct_count} | |
- Final Accuracy: {final_accuracy:.1f}% | |
- Session ID: {session_id} | |
**Save Status:** {save_result} | |
The automated exam has been completed successfully! | |
""" | |
except Exception as e: | |
yield f"❌ Error during automated exam: {str(e)}" | |
with gr.Blocks(title="Automated Urology Exam System") as demo: | |
gr.Markdown("# Automated Urology Exam System") | |
gr.Markdown("This system automatically runs a complete urology exam for AI models using ALL available questions (~150) and saves results to the dataset.") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("**Get your API key:** [OpenRouter Keys](https://openrouter.ai/settings/keys)") | |
api_key_input = gr.Textbox( | |
label="OpenRouter API Key", | |
type="password", | |
placeholder="Enter your OpenRouter API key" | |
) | |
with gr.Column(): | |
gr.Markdown("**Find models:** [OpenRouter Models](https://openrouter.ai/models)") | |
model_input = gr.Textbox( | |
label="Model Name", | |
placeholder="e.g., anthropic/claude-3-sonnet", | |
value="anthropic/claude-3-sonnet" | |
) | |
with gr.Row(): | |
start_exam_btn = gr.Button("Start Automated Exam", variant="primary", size="lg") | |
with gr.Row(): | |
progress_output = gr.Textbox( | |
label="Exam Progress - Dont close this window", | |
placeholder="Exam progress will be displayed here...", | |
lines=15, | |
max_lines=20, | |
interactive=False | |
) | |
start_exam_btn.click( | |
run_automated_exam, | |
inputs=[api_key_input, model_input], | |
outputs=[progress_output] | |
) | |
if __name__ == "__main__": | |
demo.launch() |