|
import os |
|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
import yaml |
|
from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, WikipediaSearchTool |
|
from datasets import load_dataset |
|
from cache_manager import CacheManager |
|
from tools.final_answer import final_answer |
|
from tools.get_file import get_file |
|
from tools.web_scraping import ( |
|
scrape_webpage_content, |
|
extract_links_from_webpage, |
|
get_webpage_metadata |
|
) |
|
|
|
|
|
dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", trust_remote_code=True, cache_dir="GAIA") |
|
print("GAIA dataset loaded successfully.") |
|
|
|
|
|
cache_manager = CacheManager() |
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
class QAAgent: |
|
def __init__(self, temperature=None, max_tokens=None, max_steps=None): |
|
""" |
|
Initialize the QA Agent with configuration from config.yaml. |
|
|
|
Args: |
|
temperature: Temperature for text generation (overrides config) |
|
max_tokens: Maximum number of tokens for the model (overrides config) |
|
max_steps: Maximum number of steps the agent can take (overrides config) |
|
""" |
|
print("Initializing QA Agent with configuration...") |
|
|
|
try: |
|
|
|
config = self._load_config() |
|
|
|
|
|
prompts = self._load_prompts() |
|
|
|
|
|
model_config = config.get('model', {}) |
|
model_id = model_config.get('model_id', 'anthropic/claude-sonnet-4-20250514') |
|
temp = temperature if temperature is not None else model_config.get('temperature', 0.2) |
|
max_tok = max_tokens if max_tokens is not None else model_config.get('max_tokens', 2096) |
|
|
|
|
|
agent_config = config.get('agent', {}) |
|
self.max_steps = max_steps if max_steps is not None else agent_config.get('max_steps', 5) |
|
|
|
print(f"Model: {model_id}") |
|
print(f"Temperature: {temp}") |
|
print(f"Max tokens: {max_tok}") |
|
print(f"Max steps: {self.max_steps}") |
|
|
|
|
|
model_params = { |
|
'model_id': model_id, |
|
'temperature': temp, |
|
'max_tokens': max_tok |
|
} |
|
|
|
|
|
if model_id.startswith('vertex_ai/'): |
|
print("Configuring Vertex AI parameters...") |
|
vertex_config = config.get('vertex_ai', {}) |
|
|
|
|
|
if 'vertex_project' in vertex_config and vertex_config['vertex_project'] != 'your-gcp-project-id': |
|
model_params['vertex_project'] = vertex_config['vertex_project'] |
|
print(f" Vertex Project: {vertex_config['vertex_project']}") |
|
|
|
|
|
if 'vertex_location' in vertex_config: |
|
model_params['vertex_location'] = vertex_config['vertex_location'] |
|
print(f" Vertex Location: {vertex_config['vertex_location']}") |
|
|
|
|
|
creds_path = vertex_config.get('vertex_credentials') |
|
if creds_path and creds_path not in ['/path/to/service-account.json', './google.json']: |
|
if os.path.exists(creds_path): |
|
try: |
|
|
|
import json |
|
with open(creds_path, 'r') as f: |
|
json.load(f) |
|
model_params['vertex_credentials'] = creds_path |
|
print(f" Vertex Credentials: {creds_path}") |
|
except (json.JSONDecodeError, Exception) as e: |
|
print(f" Warning: Invalid credentials file {creds_path}: {e}") |
|
else: |
|
print(f" Warning: Credentials file not found: {creds_path}") |
|
|
|
|
|
if 'safety_settings' in vertex_config: |
|
model_params['safety_settings'] = vertex_config['safety_settings'] |
|
print(f" Safety Settings: {len(vertex_config['safety_settings'])} categories configured") |
|
|
|
|
|
model = LiteLLMModel(**model_params) |
|
|
|
|
|
tools = [ |
|
DuckDuckGoSearchTool(), |
|
WikipediaSearchTool(), |
|
get_file, |
|
scrape_webpage_content, |
|
extract_links_from_webpage, |
|
get_webpage_metadata, |
|
final_answer |
|
] |
|
|
|
|
|
self.agent = CodeAgent( |
|
tools=tools, |
|
model=model, |
|
max_steps=self.max_steps |
|
) |
|
|
|
|
|
self.prompts = prompts |
|
|
|
print("Agent initialized successfully!") |
|
|
|
except Exception as e: |
|
|
|
error_msg = f"Error initializing QA Agent: {e}" |
|
|
|
if "authentication" in str(e).lower() or "api" in str(e).lower() or "credentials" in str(e).lower(): |
|
if hasattr(self, '_load_config'): |
|
config = self._load_config() |
|
model_id = config.get('model', {}).get('model_id', '') |
|
|
|
if "vertex_ai" in model_id.lower() or "gemini" in model_id.lower(): |
|
error_msg += "\n\nFor Vertex AI models, please:" |
|
error_msg += "\n1. Set up authentication:" |
|
error_msg += "\n Option A: gcloud auth application-default login" |
|
error_msg += "\n Option B: export GOOGLE_APPLICATION_CREDENTIALS='/path/to/service-account.json'" |
|
error_msg += "\n Option C: Set vertex_credentials in config.yaml" |
|
error_msg += "\n2. Update config.yaml with your:" |
|
error_msg += "\n - vertex_project: 'your-gcp-project-id'" |
|
error_msg += "\n - vertex_location: 'us-central1' (or your preferred region)" |
|
elif "anthropic" in model_id.lower(): |
|
error_msg += "\n\nFor Anthropic models, please set: export ANTHROPIC_API_KEY='your-key-here'" |
|
elif "openai" in model_id.lower() or "gpt" in model_id.lower(): |
|
error_msg += "\n\nFor OpenAI models, please set: export OPENAI_API_KEY='your-key-here'" |
|
|
|
print(error_msg) |
|
raise Exception(error_msg) |
|
|
|
def _load_config(self): |
|
"""Load configuration from config.yaml""" |
|
try: |
|
with open('config.yaml', 'r') as f: |
|
return yaml.safe_load(f) |
|
except FileNotFoundError: |
|
print("Warning: config.yaml not found, using default configuration") |
|
return {} |
|
except Exception as e: |
|
print(f"Error loading config.yaml: {e}") |
|
return {} |
|
|
|
def _load_prompts(self): |
|
"""Load prompts from prompts.yaml""" |
|
try: |
|
with open('prompts.yaml', 'r') as f: |
|
return yaml.safe_load(f) |
|
except FileNotFoundError: |
|
print("Warning: prompts.yaml not found, using default prompts") |
|
return {} |
|
except Exception as e: |
|
print(f"Error loading prompts.yaml: {e}") |
|
return {} |
|
|
|
def __call__(self, question: str) -> str: |
|
print(f"Agent received question (first 50 chars): {question[:50]}...") |
|
try: |
|
|
|
system_prompt = self.prompts.get('system_prompt', '') |
|
if system_prompt: |
|
enhanced_question = f"{system_prompt}\n\n{question}" |
|
else: |
|
enhanced_question = question |
|
|
|
|
|
answer = self.agent.run(enhanced_question) |
|
print(f"Agent returning answer (first 100 chars): {str(answer)[:100]}...") |
|
return str(answer) |
|
except Exception as e: |
|
print(f"Error running agent: {e}") |
|
return f"Error processing question: {e}" |
|
|
|
def run_questions(profile: gr.OAuthProfile | None): |
|
""" |
|
Fetches all questions, runs the QAAgent on them, and caches the answers. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username = f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
|
|
|
|
try: |
|
agent = QAAgent() |
|
except Exception as e: |
|
print(f"Error instantiating agent: {e}") |
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.JSONDecodeError as e: |
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
print(f"Response text: {response.text[:500]}") |
|
return f"Error decoding server response for questions: {e}", None |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
except Exception as e: |
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
results_log = [] |
|
cached_count = 0 |
|
processed_count = 0 |
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
file_name = item.get("file_name") |
|
|
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
|
|
cached_result = cache_manager.get_cached_answer(question_text) |
|
if cached_result and cached_result.get('cache_valid', False): |
|
print(f"Using cached answer for task {task_id}") |
|
submitted_answer = cached_result['answer'] |
|
cached_count += 1 |
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": question_text, |
|
"Submitted Answer": submitted_answer, |
|
"Status": "Cached" |
|
}) |
|
else: |
|
|
|
try: |
|
print(f"Processing task {task_id} with agent...") |
|
|
|
|
|
enhanced_question = question_text |
|
if file_name: |
|
enhanced_question = f"{question_text}\n\nNote: This question references a file named '{file_name}'. Use the get_file tool to retrieve its content." |
|
|
|
submitted_answer = agent(enhanced_question) |
|
|
|
|
|
cache_success = cache_manager.cache_answer( |
|
question=question_text, |
|
answer=submitted_answer, |
|
iterations=1, |
|
file_name=file_name |
|
) |
|
|
|
processed_count += 1 |
|
status = "Processed & Cached" if cache_success else "Processed (Cache Failed)" |
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": question_text, |
|
"Submitted Answer": submitted_answer, |
|
"Status": status |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error running agent on task {task_id}: {e}") |
|
error_answer = f"AGENT ERROR: {e}" |
|
|
|
|
|
cache_manager.cache_answer( |
|
question=question_text, |
|
answer=error_answer, |
|
iterations=1, |
|
file_name=file_name |
|
) |
|
|
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": question_text, |
|
"Submitted Answer": error_answer, |
|
"Status": "Error" |
|
}) |
|
|
|
status_message = ( |
|
f"Questions processing completed!\n" |
|
f"Total questions: {len(questions_data)}\n" |
|
f"Used cached answers: {cached_count}\n" |
|
f"Newly processed: {processed_count}\n" |
|
f"Answers are cached and ready for submission." |
|
) |
|
|
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
def submit_answers(profile: gr.OAuthProfile | None): |
|
""" |
|
Loads cached answers and submits them to the evaluation server. |
|
""" |
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
if profile: |
|
username = f"{profile.username}" |
|
print(f"User logged in: {username}") |
|
else: |
|
print("User not logged in.") |
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
api_url = DEFAULT_API_URL |
|
questions_url = f"{api_url}/questions" |
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(agent_code) |
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=15) |
|
response.raise_for_status() |
|
questions_data = response.json() |
|
if not questions_data: |
|
print("Fetched questions list is empty.") |
|
return "Fetched questions list is empty or invalid format.", None |
|
print(f"Fetched {len(questions_data)} questions.") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error fetching questions: {e}") |
|
return f"Error fetching questions: {e}", None |
|
|
|
|
|
answers_payload = [] |
|
results_log = [] |
|
missing_answers = [] |
|
|
|
for item in questions_data: |
|
task_id = item.get("task_id") |
|
question_text = item.get("question") |
|
|
|
if not task_id or question_text is None: |
|
print(f"Skipping item with missing task_id or question: {item}") |
|
continue |
|
|
|
|
|
cached_result = cache_manager.get_cached_answer(question_text) |
|
if cached_result and cached_result.get('cache_valid', False): |
|
submitted_answer = cached_result['answer'] |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": question_text, |
|
"Submitted Answer": submitted_answer, |
|
"Status": "Ready for Submission" |
|
}) |
|
else: |
|
missing_answers.append(task_id) |
|
results_log.append({ |
|
"Task ID": task_id, |
|
"Question": question_text, |
|
"Submitted Answer": "NO CACHED ANSWER", |
|
"Status": "Missing Answer" |
|
}) |
|
|
|
if missing_answers: |
|
status_message = ( |
|
f"Cannot submit: Missing cached answers for {len(missing_answers)} questions.\n" |
|
f"Missing task IDs: {missing_answers[:5]}{'...' if len(missing_answers) > 5 else ''}\n" |
|
f"Please run the questions first to generate and cache answers." |
|
) |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
if not answers_payload: |
|
print("No valid cached answers found for submission.") |
|
return "No valid cached answers found for submission.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
status_update = f"Submitting {len(answers_payload)} cached answers for user '{username}'..." |
|
print(status_update) |
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
response.raise_for_status() |
|
result_data = response.json() |
|
final_status = ( |
|
f"Submission Successful!\n" |
|
f"User: {result_data.get('username')}\n" |
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
) |
|
print("Submission successful.") |
|
results_df = pd.DataFrame(results_log) |
|
return final_status, results_df |
|
except requests.exceptions.HTTPError as e: |
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
try: |
|
error_json = e.response.json() |
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
except requests.exceptions.JSONDecodeError: |
|
error_detail += f" Response: {e.response.text[:500]}" |
|
status_message = f"Submission Failed: {error_detail}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.Timeout: |
|
status_message = "Submission Failed: The request timed out." |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except requests.exceptions.RequestException as e: |
|
status_message = f"Submission Failed: Network error - {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
except Exception as e: |
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
print(status_message) |
|
results_df = pd.DataFrame(results_log) |
|
return status_message, results_df |
|
|
|
def clear_cache(): |
|
"""Clear all cached answers.""" |
|
cache_manager.clear_cache() |
|
return "Cache cleared successfully!", pd.DataFrame() |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# QA Agent Evaluation Runner") |
|
gr.Markdown( |
|
""" |
|
**Instructions:** |
|
|
|
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... |
|
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. |
|
3. Click 'Run Questions' to fetch questions and run your agent (answers will be cached). |
|
4. Click 'Submit Answers' to submit the cached answers and see your score. |
|
5. Use 'Clear Cache' to remove all cached answers if needed. |
|
|
|
--- |
|
**Benefits of Separate Run/Submit:** |
|
- Answers are cached, so you can run questions once and submit multiple times |
|
- Faster submission since answers are pre-computed |
|
- Better error handling and recovery |
|
- Ability to review answers before submission |
|
""" |
|
) |
|
|
|
gr.LoginButton() |
|
|
|
with gr.Row(): |
|
run_button = gr.Button("Run Questions", variant="primary") |
|
submit_button = gr.Button("Submit Answers", variant="secondary") |
|
clear_button = gr.Button("Clear Cache", variant="stop") |
|
|
|
status_output = gr.Textbox(label="Status / Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
run_button.click( |
|
fn=run_questions, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
submit_button.click( |
|
fn=submit_answers, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
clear_button.click( |
|
fn=clear_cache, |
|
outputs=[status_output, results_table] |
|
) |
|
|
|
if __name__ == "__main__": |
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
|
space_host_startup = os.getenv("SPACE_HOST") |
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
|
if space_host_startup: |
|
print(f"✅ SPACE_HOST found: {space_host_startup}") |
|
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") |
|
else: |
|
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") |
|
|
|
if space_id_startup: |
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") |
|
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") |
|
else: |
|
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") |
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
|
print("Launching Gradio Interface for QA Agent Evaluation...") |
|
demo.launch(debug=True, share=False) |