qa-agent / app.py
Jan Krüger
QA Agent for Certification
8d4d62e
import os
import gradio as gr
import requests
import pandas as pd
import yaml
from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, WikipediaSearchTool
from datasets import load_dataset
from cache_manager import CacheManager
from tools.final_answer import final_answer
from tools.get_file import get_file
from tools.web_scraping import (
scrape_webpage_content,
extract_links_from_webpage,
get_webpage_metadata
)
# Load the GAIA dataset
dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", trust_remote_code=True, cache_dir="GAIA")
print("GAIA dataset loaded successfully.")
# Initialize cache manager
cache_manager = CacheManager()
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- QA Agent Definition ---
class QAAgent:
def __init__(self, temperature=None, max_tokens=None, max_steps=None):
"""
Initialize the QA Agent with configuration from config.yaml.
Args:
temperature: Temperature for text generation (overrides config)
max_tokens: Maximum number of tokens for the model (overrides config)
max_steps: Maximum number of steps the agent can take (overrides config)
"""
print("Initializing QA Agent with configuration...")
try:
# Load configuration
config = self._load_config()
# Load prompts
prompts = self._load_prompts()
# Get model configuration with overrides
model_config = config.get('model', {})
model_id = model_config.get('model_id', 'anthropic/claude-sonnet-4-20250514')
temp = temperature if temperature is not None else model_config.get('temperature', 0.2)
max_tok = max_tokens if max_tokens is not None else model_config.get('max_tokens', 2096)
# Get agent configuration with overrides
agent_config = config.get('agent', {})
self.max_steps = max_steps if max_steps is not None else agent_config.get('max_steps', 5)
print(f"Model: {model_id}")
print(f"Temperature: {temp}")
print(f"Max tokens: {max_tok}")
print(f"Max steps: {self.max_steps}")
# Prepare model initialization parameters
model_params = {
'model_id': model_id,
'temperature': temp,
'max_tokens': max_tok
}
# Add Vertex AI specific parameters if using a vertex_ai model
if model_id.startswith('vertex_ai/'):
print("Configuring Vertex AI parameters...")
vertex_config = config.get('vertex_ai', {})
# Add vertex project if specified
if 'vertex_project' in vertex_config and vertex_config['vertex_project'] != 'your-gcp-project-id':
model_params['vertex_project'] = vertex_config['vertex_project']
print(f" Vertex Project: {vertex_config['vertex_project']}")
# Add vertex location if specified
if 'vertex_location' in vertex_config:
model_params['vertex_location'] = vertex_config['vertex_location']
print(f" Vertex Location: {vertex_config['vertex_location']}")
# Add vertex credentials if specified and valid
creds_path = vertex_config.get('vertex_credentials')
if creds_path and creds_path not in ['/path/to/service-account.json', './google.json']:
if os.path.exists(creds_path):
try:
# Validate it's a proper JSON file
import json
with open(creds_path, 'r') as f:
json.load(f)
model_params['vertex_credentials'] = creds_path
print(f" Vertex Credentials: {creds_path}")
except (json.JSONDecodeError, Exception) as e:
print(f" Warning: Invalid credentials file {creds_path}: {e}")
else:
print(f" Warning: Credentials file not found: {creds_path}")
# Add safety settings if specified
if 'safety_settings' in vertex_config:
model_params['safety_settings'] = vertex_config['safety_settings']
print(f" Safety Settings: {len(vertex_config['safety_settings'])} categories configured")
# Initialize the LiteLLM model
model = LiteLLMModel(**model_params)
# Available tools for the agent
tools = [
DuckDuckGoSearchTool(),
WikipediaSearchTool(),
get_file,
scrape_webpage_content,
extract_links_from_webpage,
get_webpage_metadata,
final_answer
]
# Create the agent without prompt_templates (they'll be used in question processing)
self.agent = CodeAgent(
tools=tools,
model=model,
max_steps=self.max_steps
)
# Store prompts for use in question processing
self.prompts = prompts
print("Agent initialized successfully!")
except Exception as e:
# Provide helpful error messages based on the model type
error_msg = f"Error initializing QA Agent: {e}"
if "authentication" in str(e).lower() or "api" in str(e).lower() or "credentials" in str(e).lower():
if hasattr(self, '_load_config'):
config = self._load_config()
model_id = config.get('model', {}).get('model_id', '')
if "vertex_ai" in model_id.lower() or "gemini" in model_id.lower():
error_msg += "\n\nFor Vertex AI models, please:"
error_msg += "\n1. Set up authentication:"
error_msg += "\n Option A: gcloud auth application-default login"
error_msg += "\n Option B: export GOOGLE_APPLICATION_CREDENTIALS='/path/to/service-account.json'"
error_msg += "\n Option C: Set vertex_credentials in config.yaml"
error_msg += "\n2. Update config.yaml with your:"
error_msg += "\n - vertex_project: 'your-gcp-project-id'"
error_msg += "\n - vertex_location: 'us-central1' (or your preferred region)"
elif "anthropic" in model_id.lower():
error_msg += "\n\nFor Anthropic models, please set: export ANTHROPIC_API_KEY='your-key-here'"
elif "openai" in model_id.lower() or "gpt" in model_id.lower():
error_msg += "\n\nFor OpenAI models, please set: export OPENAI_API_KEY='your-key-here'"
print(error_msg)
raise Exception(error_msg)
def _load_config(self):
"""Load configuration from config.yaml"""
try:
with open('config.yaml', 'r') as f:
return yaml.safe_load(f)
except FileNotFoundError:
print("Warning: config.yaml not found, using default configuration")
return {}
except Exception as e:
print(f"Error loading config.yaml: {e}")
return {}
def _load_prompts(self):
"""Load prompts from prompts.yaml"""
try:
with open('prompts.yaml', 'r') as f:
return yaml.safe_load(f)
except FileNotFoundError:
print("Warning: prompts.yaml not found, using default prompts")
return {}
except Exception as e:
print(f"Error loading prompts.yaml: {e}")
return {}
def __call__(self, question: str) -> str:
print(f"Agent received question (first 50 chars): {question[:50]}...")
try:
# Get system prompt from loaded prompts and combine with question
system_prompt = self.prompts.get('system_prompt', '')
if system_prompt:
enhanced_question = f"{system_prompt}\n\n{question}"
else:
enhanced_question = question
# Use the agent to run and answer the enhanced question
answer = self.agent.run(enhanced_question)
print(f"Agent returning answer (first 100 chars): {str(answer)[:100]}...")
return str(answer)
except Exception as e:
print(f"Error running agent: {e}")
return f"Error processing question: {e}"
def run_questions(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the QAAgent on them, and caches the answers.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
# 1. Instantiate Agent
try:
agent = QAAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run Agent and Cache Results
results_log = []
cached_count = 0
processed_count = 0
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
# Check if answer is already cached
cached_result = cache_manager.get_cached_answer(question_text)
if cached_result and cached_result.get('cache_valid', False):
print(f"Using cached answer for task {task_id}")
submitted_answer = cached_result['answer']
cached_count += 1
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
"Status": "Cached"
})
else:
# Run agent and cache the result
try:
print(f"Processing task {task_id} with agent...")
# Enhance question with file information if file is present
enhanced_question = question_text
if file_name:
enhanced_question = f"{question_text}\n\nNote: This question references a file named '{file_name}'. Use the get_file tool to retrieve its content."
submitted_answer = agent(enhanced_question)
# Cache the answer
cache_success = cache_manager.cache_answer(
question=question_text,
answer=submitted_answer,
iterations=1,
file_name=file_name
)
processed_count += 1
status = "Processed & Cached" if cache_success else "Processed (Cache Failed)"
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
"Status": status
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
error_answer = f"AGENT ERROR: {e}"
# Cache the error (will be marked as invalid)
cache_manager.cache_answer(
question=question_text,
answer=error_answer,
iterations=1,
file_name=file_name
)
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": error_answer,
"Status": "Error"
})
status_message = (
f"Questions processing completed!\n"
f"Total questions: {len(questions_data)}\n"
f"Used cached answers: {cached_count}\n"
f"Newly processed: {processed_count}\n"
f"Answers are cached and ready for submission."
)
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
def submit_answers(profile: gr.OAuthProfile | None):
"""
Loads cached answers and submits them to the evaluation server.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# In the case of an app running as a Hugging Face space, this link points toward your codebase
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 1. Fetch Questions to get task_ids
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
# 2. Load Cached Answers
answers_payload = []
results_log = []
missing_answers = []
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
# Try to get cached answer
cached_result = cache_manager.get_cached_answer(question_text)
if cached_result and cached_result.get('cache_valid', False):
submitted_answer = cached_result['answer']
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
"Status": "Ready for Submission"
})
else:
missing_answers.append(task_id)
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": "NO CACHED ANSWER",
"Status": "Missing Answer"
})
if missing_answers:
status_message = (
f"Cannot submit: Missing cached answers for {len(missing_answers)} questions.\n"
f"Missing task IDs: {missing_answers[:5]}{'...' if len(missing_answers) > 5 else ''}\n"
f"Please run the questions first to generate and cache answers."
)
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
if not answers_payload:
print("No valid cached answers found for submission.")
return "No valid cached answers found for submission.", pd.DataFrame(results_log)
# 3. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Submitting {len(answers_payload)} cached answers for user '{username}'..."
print(status_update)
# 4. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
def clear_cache():
"""Clear all cached answers."""
cache_manager.clear_cache()
return "Cache cleared successfully!", pd.DataFrame()
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# QA Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Questions' to fetch questions and run your agent (answers will be cached).
4. Click 'Submit Answers' to submit the cached answers and see your score.
5. Use 'Clear Cache' to remove all cached answers if needed.
---
**Benefits of Separate Run/Submit:**
- Answers are cached, so you can run questions once and submit multiple times
- Faster submission since answers are pre-computed
- Better error handling and recovery
- Ability to review answers before submission
"""
)
gr.LoginButton()
with gr.Row():
run_button = gr.Button("Run Questions", variant="primary")
submit_button = gr.Button("Submit Answers", variant="secondary")
clear_button = gr.Button("Clear Cache", variant="stop")
status_output = gr.Textbox(label="Status / Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_questions,
outputs=[status_output, results_table]
)
submit_button.click(
fn=submit_answers,
outputs=[status_output, results_table]
)
clear_button.click(
fn=clear_cache,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for QA Agent Evaluation...")
demo.launch(debug=True, share=False)