Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
SmolAgent Test Client | |
A Gradio-based test client for the BasicSmolAgent that: | |
1. Fetches random questions from the evaluation API | |
2. Executes the agent with detailed tracking | |
3. Displays comprehensive execution information | |
4. Supports custom question testing | |
5. Tests against evaluation questions from questions_evaluated.py | |
Usage: python agent_test_client.py | |
""" | |
import gradio as gr | |
import requests | |
from agent import BasicSmolAgent | |
import traceback | |
import time | |
from contextlib import redirect_stdout, redirect_stderr | |
import io | |
import pandas as pd | |
from questions_evaluated import questions | |
import json | |
import sys | |
from typing import Optional, Dict, Any, List | |
# Configuration | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
class AgentExecutionTracker: | |
"""Tracks and logs agent execution with detailed information""" | |
def __init__(self): | |
self.reset() | |
def reset(self): | |
"""Reset tracking for new execution""" | |
self.logs = [] | |
self.start_time = "" | |
self.end_time = "" | |
self.question = "" | |
self.agent_response = "" | |
self.final_answer = "" | |
self.captured_stdout = "" | |
self.captured_stderr = "" | |
def log(self, level, message): | |
"""Add a log entry with timestamp""" | |
timestamp = time.strftime("%H:%M:%S") | |
self.logs.append(f"[{timestamp}] {level}: {message}") | |
def get_formatted_log(self): | |
"""Get comprehensive formatted execution log""" | |
lines = [ | |
"π€ AGENT EXECUTION LOG", | |
"=" * 60, | |
f"π Question: {self.question}", | |
f"β° Started: {self.start_time}", | |
f"β±οΈ Ended: {self.end_time}", | |
"", | |
"π EXECUTION STEPS:", | |
"-" * 40 | |
] | |
# Add all log entries | |
for log_entry in self.logs: | |
lines.append(log_entry) | |
# Add captured outputs if any | |
if self.captured_stdout.strip(): | |
lines.extend([ | |
"", | |
"π€ CAPTURED STDOUT:", | |
"-" * 30, | |
self.captured_stdout, | |
"-" * 30 | |
]) | |
if self.captured_stderr.strip(): | |
lines.extend([ | |
"", | |
"β οΈ CAPTURED STDERR:", | |
"-" * 30, | |
self.captured_stderr, | |
"-" * 30 | |
]) | |
# Add final results | |
lines.extend([ | |
"", | |
"π― RESULTS:", | |
"-" * 20, | |
f"Agent Response Length: {len(self.agent_response)} characters", | |
]) | |
if self.final_answer: | |
lines.append(f"Final Answer: {self.final_answer}") | |
return "\n".join(lines) | |
class SmolAgentTester: | |
"""Main tester class that handles agent execution and API calls""" | |
def __init__(self): | |
self.agent = None | |
self.tracker = AgentExecutionTracker() | |
self.api_url = DEFAULT_API_URL | |
def _initialize_agent(self): | |
"""Initialize the BasicSmolAgent if not already done""" | |
if self.agent is None: | |
try: | |
self.tracker.log("INIT", "Initializing BasicSmolAgent...") | |
self.agent = BasicSmolAgent() | |
self.tracker.log("INIT", "β BasicSmolAgent initialized successfully") | |
return True | |
except Exception as e: | |
self.tracker.log("ERROR", f"Failed to initialize agent: {str(e)}") | |
return False | |
return True | |
def fetch_random_question(self): | |
"""Fetch a random question from the evaluation API""" | |
try: | |
self.tracker.log("API", "Fetching random question from evaluation API...") | |
response = requests.get(f"{self.api_url}/random-question", timeout=15) | |
response.raise_for_status() | |
question_data = response.json() | |
task_id = question_data.get("task_id", "Unknown") | |
question_text = question_data.get("question", "No question available") | |
self.tracker.log("API", f"β Successfully fetched question (Task ID: {task_id})") | |
return question_data | |
except requests.exceptions.Timeout: | |
self.tracker.log("ERROR", "Request timeout - API may be slow or unavailable") | |
return None | |
except requests.exceptions.ConnectionError: | |
self.tracker.log("ERROR", "Connection error - Check internet connection") | |
return None | |
except requests.exceptions.HTTPError as e: | |
self.tracker.log("ERROR", f"HTTP error {e.response.status_code} - API may be unavailable") | |
return None | |
except Exception as e: | |
self.tracker.log("ERROR", f"Unexpected error fetching question: {str(e)}") | |
return None | |
def execute_agent(self, question): | |
"""Execute the agent with comprehensive tracking""" | |
# Reset tracker for new execution | |
self.tracker.reset() | |
self.tracker.question = question | |
self.tracker.start_time = time.strftime("%H:%M:%S") | |
try: | |
# Initialize agent if needed | |
if not self._initialize_agent(): | |
self.tracker.end_time = time.strftime("%H:%M:%S") | |
return "Failed to initialize agent" | |
self.tracker.log("EXEC", "Starting agent execution...") | |
self.tracker.log("QUESTION", f"Processing: {question[:100]}{'...' if len(question) > 100 else ''}") | |
# Capture stdout and stderr during execution | |
stdout_buffer = io.StringIO() | |
stderr_buffer = io.StringIO() | |
with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): | |
result = self.agent(question) | |
# Store captured outputs | |
self.tracker.captured_stdout = stdout_buffer.getvalue() | |
self.tracker.captured_stderr = stderr_buffer.getvalue() | |
self.tracker.log("EXEC", "β Agent execution completed successfully") | |
# FIXED: Handle non-string results from agent | |
original_type = type(result).__name__ | |
if isinstance(result, str): | |
result_str = result | |
self.tracker.log("RESPONSE", f"Agent returned string ({len(result_str)} characters)") | |
else: | |
result_str = str(result) | |
self.tracker.log("RESPONSE", f"Agent returned {original_type}: {result}") | |
self.tracker.log("RESPONSE", f"Converted to string ({len(result_str)} characters)") | |
# Extract final answer if present in string version | |
if "FINAL ANSWER:" in result_str: | |
final_answer = result_str.split("FINAL ANSWER:")[-1].strip() | |
self.tracker.final_answer = final_answer | |
self.tracker.log("ANSWER", f"Extracted final answer: {final_answer[:50]}{'...' if len(final_answer) > 50 else ''}") | |
else: | |
# If no "FINAL ANSWER:" format and original was not a string, use the converted string | |
if not isinstance(result, str): | |
self.tracker.final_answer = result_str | |
self.tracker.log("ANSWER", f"No FINAL ANSWER format, using converted {original_type}: {result_str}") | |
else: | |
self.tracker.final_answer = "No explicit final answer found" | |
self.tracker.log("ANSWER", "No explicit FINAL ANSWER format detected") | |
self.tracker.agent_response = result_str | |
self.tracker.end_time = time.strftime("%H:%M:%S") | |
return result_str | |
except Exception as e: | |
error_msg = f"Agent execution failed: {str(e)}" | |
self.tracker.log("ERROR", error_msg) | |
self.tracker.log("ERROR", f"Traceback: {traceback.format_exc()}") | |
self.tracker.end_time = time.strftime("%H:%M:%S") | |
return f"ERROR: {error_msg}" | |
# Global tester instance | |
tester = SmolAgentTester() | |
def test_random_question(): | |
"""Handle random question testing""" | |
try: | |
# Fetch random question | |
question_data = tester.fetch_random_question() | |
if not question_data: | |
return ( | |
"β Failed to fetch random question from API", | |
"Please check your internet connection and try again.\nThe evaluation API might be temporarily unavailable.", | |
"No response available" | |
) | |
question_text = question_data.get("question", "No question text available") | |
task_id = question_data.get("task_id", "Unknown") | |
# Execute agent | |
agent_response = tester.execute_agent(question_text) | |
# Format outputs | |
question_info = f"π Task ID: {task_id}\n\nπ Question:\n{question_text}" | |
execution_log = tester.tracker.get_formatted_log() | |
result_summary = f"π€ Agent Response:\n{agent_response}\n\n" | |
if tester.tracker.final_answer: | |
result_summary += f"π― Final Answer: {tester.tracker.final_answer}" | |
return question_info, execution_log, result_summary | |
except Exception as e: | |
error_msg = f"Unexpected error in random question test: {str(e)}\n{traceback.format_exc()}" | |
return f"β Error: {error_msg}", "", "" | |
def test_custom_question(custom_question): | |
"""Handle custom question testing""" | |
if not custom_question.strip(): | |
return "β Please enter a question to test", "", "" | |
try: | |
# Execute agent with custom question | |
agent_response = tester.execute_agent(custom_question.strip()) | |
# Format outputs | |
question_info = f"π Custom Question:\n{custom_question.strip()}" | |
execution_log = tester.tracker.get_formatted_log() | |
result_summary = f"π€ Agent Response:\n{agent_response}\n\n" | |
if tester.tracker.final_answer: | |
result_summary += f"π― Final Answer: {tester.tracker.final_answer}" | |
return question_info, execution_log, result_summary | |
except Exception as e: | |
error_msg = f"Unexpected error in custom question test: {str(e)}\n{traceback.format_exc()}" | |
return f"β Error: {error_msg}", "", "" | |
def get_evaluation_questions(): | |
"""Get list of evaluation questions for dropdown""" | |
question_choices = [] | |
for i, q in enumerate(questions): | |
task_id = q.get("task_id", "Unknown") | |
question_text = q.get("question", "No question") | |
level = q.get("Level", "Unknown") | |
# Truncate long questions for display | |
display_text = question_text[:100] + "..." if len(question_text) > 100 else question_text | |
label = f"[Level {level}] {task_id[:8]}... - {display_text}" | |
question_choices.append((label, i)) | |
return question_choices | |
def test_evaluation_question(question_index): | |
"""Handle evaluation question testing""" | |
if question_index is None: | |
return "β Please select a question to test", "", "", "" | |
try: | |
selected_question = questions[question_index] | |
question_text = selected_question.get("question", "No question text") | |
task_id = selected_question.get("task_id", "Unknown") | |
level = selected_question.get("Level", "Unknown") | |
file_name = selected_question.get("file_name", "") | |
# Execute agent | |
agent_response = tester.execute_agent(question_text) | |
# Format outputs | |
question_info = f"π Task ID: {task_id}\nπ Level: {level}\nπ File: {file_name if file_name else 'None'}\n\nπ Question:\n{question_text}" | |
execution_log = tester.tracker.get_formatted_log() | |
result_summary = f"π€ Agent Response:\n{agent_response}\n\n" | |
if tester.tracker.final_answer: | |
result_summary += f"π― Final Answer: {tester.tracker.final_answer}" | |
# Get the correct answer | |
correct_answer = get_correct_answer(task_id) | |
if correct_answer: | |
correct_answer_display = f"β **Correct Answer:**\n{correct_answer}\n\nπ **Task ID:** {task_id}\nπ **Level:** {level}" | |
else: | |
correct_answer_display = f"β **Correct Answer:**\nNot found in metadata\n\nπ **Task ID:** {task_id}\nπ **Level:** {level}" | |
return question_info, execution_log, result_summary, correct_answer_display | |
except Exception as e: | |
error_msg = f"Unexpected error in evaluation question test: {str(e)}\n{traceback.format_exc()}" | |
return f"β Error: {error_msg}", "", "", "" | |
def test_all_evaluation_questions(): | |
"""Run all evaluation questions and return results""" | |
try: | |
results = [] | |
total_questions = len(questions) | |
progress_info = f"π Running {total_questions} evaluation questions...\n\n" | |
for i, question_data in enumerate(questions): | |
question_text = question_data.get("question", "No question text") | |
task_id = question_data.get("task_id", "Unknown") | |
level = question_data.get("Level", "Unknown") | |
progress_info += f"Processing question {i+1}/{total_questions}: {task_id[:8]}...\n" | |
try: | |
# Execute agent | |
agent_response = tester.execute_agent(question_text) | |
# Extract final answer | |
final_answer = tester.tracker.final_answer if tester.tracker.final_answer else "No answer extracted" | |
# Get correct answer | |
correct_answer = get_correct_answer(task_id) | |
correct_answer_display = correct_answer if correct_answer else "Not found" | |
results.append({ | |
"Task ID": task_id, | |
"Level": level, | |
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
"Agent Answer": final_answer, | |
"Correct Answer": correct_answer_display, | |
"Response Length": len(agent_response), | |
"Status": "Success" | |
}) | |
except Exception as e: | |
# Get correct answer even if agent failed | |
correct_answer = get_correct_answer(task_id) | |
correct_answer_display = correct_answer if correct_answer else "Not found" | |
results.append({ | |
"Task ID": task_id, | |
"Level": level, | |
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
"Agent Answer": f"ERROR: {str(e)}", | |
"Correct Answer": correct_answer_display, | |
"Response Length": 0, | |
"Status": "Failed" | |
}) | |
# Create DataFrame for results | |
results_df = pd.DataFrame(results) | |
# Summary statistics | |
success_count = len([r for r in results if r["Status"] == "Success"]) | |
failure_count = total_questions - success_count | |
summary = f""" | |
β EVALUATION COMPLETE | |
π Summary: | |
- Total Questions: {total_questions} | |
- Successful: {success_count} | |
- Failed: {failure_count} | |
- Success Rate: {(success_count/total_questions)*100:.1f}% | |
""" | |
return summary, results_df, "All evaluation questions processed!" | |
except Exception as e: | |
error_msg = f"Error running all evaluation questions: {str(e)}\n{traceback.format_exc()}" | |
return f"β Error: {error_msg}", pd.DataFrame(), "" | |
def create_interface(): | |
"""Create the main Gradio interface""" | |
# Custom CSS for better styling | |
css = """ | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.tab-nav { | |
font-size: 16px !important; | |
} | |
""" | |
with gr.Blocks( | |
title="SmolAgent Test Client", | |
css=css, | |
theme=gr.themes.Base( | |
primary_hue="blue", | |
secondary_hue="gray" | |
) | |
) as interface: | |
# Header | |
gr.Markdown(""" | |
# π§ͺ SmolAgent Test Client | |
**Interactive testing environment for the BasicSmolAgent** | |
This tool allows you to thoroughly test the agent's capabilities with detailed execution tracking. | |
You can fetch random questions from the evaluation API, test with custom questions, or run specific evaluation questions. | |
""") | |
# Main tabs | |
with gr.Tabs(): | |
# Random Question Tab | |
with gr.TabItem("π² Random Question Test", elem_id="random-tab"): | |
gr.Markdown("### Fetch and test a random question from the evaluation API") | |
gr.Markdown("Click the button below to fetch a random question and run the agent on it.") | |
random_btn = gr.Button( | |
"π² Fetch Random Question & Execute Agent", | |
variant="primary", | |
size="lg", | |
scale=1 | |
) | |
# Output sections | |
with gr.Row(): | |
with gr.Column(scale=1): | |
question_display = gr.Textbox( | |
label="π Question Information", | |
lines=6, | |
max_lines=10, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Column(scale=1): | |
result_display = gr.Textbox( | |
label="π― Agent Response & Final Answer", | |
lines=6, | |
max_lines=10, | |
interactive=False, | |
show_copy_button=True | |
) | |
execution_log_display = gr.Textbox( | |
label="π Detailed Execution Log", | |
lines=20, | |
max_lines=30, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Execution log will appear here after running the agent..." | |
) | |
# Wire up the random question functionality | |
random_btn.click( | |
fn=test_random_question, | |
inputs=[], | |
outputs=[question_display, execution_log_display, result_display] | |
) | |
# Custom Question Tab | |
with gr.TabItem("βοΈ Custom Question Test", elem_id="custom-tab"): | |
gr.Markdown("### Test the agent with your own custom question") | |
gr.Markdown("Enter any question you'd like to test the agent with.") | |
custom_input = gr.Textbox( | |
label="π Your Question", | |
lines=3, | |
max_lines=5, | |
placeholder="Enter your question here...\n\nExample: What is the square root of 144?", | |
show_copy_button=True | |
) | |
custom_btn = gr.Button( | |
"π Execute Agent on Custom Question", | |
variant="secondary", | |
size="lg" | |
) | |
# Output sections for custom questions | |
with gr.Row(): | |
with gr.Column(scale=1): | |
custom_question_display = gr.Textbox( | |
label="π Question Information", | |
lines=4, | |
max_lines=8, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Column(scale=1): | |
custom_result_display = gr.Textbox( | |
label="π― Agent Response & Final Answer", | |
lines=4, | |
max_lines=8, | |
interactive=False, | |
show_copy_button=True | |
) | |
custom_execution_log_display = gr.Textbox( | |
label="π Detailed Execution Log", | |
lines=20, | |
max_lines=30, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Execution log will appear here after running the agent..." | |
) | |
# Wire up the custom question functionality | |
custom_btn.click( | |
fn=test_custom_question, | |
inputs=[custom_input], | |
outputs=[custom_question_display, custom_execution_log_display, custom_result_display] | |
) | |
# Evaluation Questions Tab | |
with gr.TabItem("π Evaluation Questions", elem_id="eval-tab"): | |
gr.Markdown("### Test with specific evaluation questions") | |
gr.Markdown(f"Select from {len(questions)} evaluation questions or run all of them.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
question_dropdown = gr.Dropdown( | |
choices=get_evaluation_questions(), | |
label="π Select Evaluation Question", | |
value=None | |
) | |
with gr.Column(scale=1): | |
eval_single_btn = gr.Button( | |
"π Run Selected Question", | |
variant="secondary", | |
size="lg" | |
) | |
eval_all_btn = gr.Button( | |
"π Run ALL Evaluation Questions", | |
variant="primary", | |
size="lg" | |
) | |
gr.Markdown("β οΈ **Warning**: Running all questions may take a long time!") | |
# Single question results | |
with gr.Row(): | |
with gr.Column(scale=1): | |
eval_question_display = gr.Textbox( | |
label="π Question Information", | |
lines=6, | |
max_lines=10, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Column(scale=1): | |
eval_result_display = gr.Textbox( | |
label="π― Agent Response & Final Answer", | |
lines=6, | |
max_lines=10, | |
interactive=False, | |
show_copy_button=True | |
) | |
with gr.Column(scale=1): | |
eval_correct_answer_display = gr.Textbox( | |
label="β Correct Answer", | |
lines=6, | |
max_lines=10, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Correct answer will appear here..." | |
) | |
eval_execution_log_display = gr.Textbox( | |
label="π Detailed Execution Log", | |
lines=15, | |
max_lines=25, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Execution log will appear here after running a question..." | |
) | |
# All questions results | |
gr.Markdown("### π Batch Results") | |
batch_summary_display = gr.Textbox( | |
label="π Batch Summary", | |
lines=8, | |
interactive=False, | |
show_copy_button=True, | |
placeholder="Summary will appear here after running all questions..." | |
) | |
batch_results_display = gr.DataFrame( | |
label="π Detailed Results Table", | |
headers=["Task ID", "Level", "Question", "Agent Answer", "Correct Answer", "Response Length", "Status"], | |
datatype=["str", "str", "str", "str", "str", "number", "str"], | |
interactive=False, | |
wrap=True | |
) | |
batch_status_display = gr.Textbox( | |
label="π Status", | |
lines=2, | |
interactive=False, | |
placeholder="Status updates will appear here..." | |
) | |
# Wire up evaluation question functionality | |
eval_single_btn.click( | |
fn=test_evaluation_question, | |
inputs=[question_dropdown], | |
outputs=[eval_question_display, eval_execution_log_display, eval_result_display, eval_correct_answer_display] | |
) | |
eval_all_btn.click( | |
fn=test_all_evaluation_questions, | |
inputs=[], | |
outputs=[batch_summary_display, batch_results_display, batch_status_display] | |
) | |
# Footer information | |
gr.Markdown("---") | |
gr.Markdown(""" | |
### π Features & Information | |
**π Execution Tracking:** | |
- Comprehensive step-by-step logging with timestamps | |
- Capture of stdout/stderr during agent execution | |
- Detailed error reporting and stack traces | |
- Performance timing information | |
**π― Response Analysis:** | |
- Full agent response display | |
- Automatic final answer extraction | |
- Response length and format analysis | |
**β‘ Testing Capabilities:** | |
- Random questions from the evaluation API endpoint | |
- Custom question testing with any input | |
- Individual evaluation question testing | |
- Batch processing of all evaluation questions | |
- Copy-friendly logs for external analysis | |
- Real-time execution monitoring | |
**π§ Technical Details:** | |
- Uses the existing BasicSmolAgent from agent.py | |
- Connects to: `https://agents-course-unit4-scoring.hf.space/random-question` | |
- Processes questions from questions_evaluated.py | |
- Captures all agent tool usage and reasoning steps | |
- Provides detailed execution diagnostics | |
""") | |
gr.Markdown(""" | |
### π Quick Start Guide | |
1. **Random Questions**: Click "Fetch Random Question & Execute Agent" to test with API questions | |
2. **Custom Questions**: Enter your own question and click "Execute Agent on Custom Question" | |
3. **Evaluation Questions**: Select a specific evaluation question or run all of them | |
4. **Review Results**: Check execution logs for detailed insights into agent processing | |
5. **Batch Analysis**: Use the "Run ALL" feature to get comprehensive performance metrics | |
""") | |
return interface | |
def main(): | |
"""Main function to launch the test client""" | |
print("π Starting SmolAgent Test Client...") | |
print("π‘ API Endpoint:", DEFAULT_API_URL) | |
print("π€ Agent Type: BasicSmolAgent") | |
print(f"π Evaluation Questions: {len(questions)} loaded") | |
print("-" * 50) | |
# Create and launch interface | |
interface = create_interface() | |
interface.launch( | |
debug=True, | |
share=False, | |
show_error=True, | |
server_name="0.0.0.0", # Allow external connections | |
server_port=7860 | |
) | |
def load_metadata() -> Dict[str, str]: | |
"""Load metadata from metadata.jsonl and return a mapping of task_id to final answer""" | |
metadata = {} | |
try: | |
with open('metadata.jsonl', 'r', encoding='utf-8') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
try: | |
data = json.loads(line) | |
task_id = data.get('task_id') | |
final_answer = data.get('Final answer') | |
if task_id and final_answer is not None: | |
metadata[task_id] = str(final_answer) | |
except json.JSONDecodeError as e: | |
print(f"Warning: Could not parse JSON line: {line[:100]}...") | |
continue | |
except FileNotFoundError: | |
print("Warning: metadata.jsonl file not found") | |
except Exception as e: | |
print(f"Warning: Error loading metadata: {e}") | |
return metadata | |
def get_correct_answer(task_id: str) -> Optional[str]: | |
"""Get the correct answer for a given task_id""" | |
metadata = load_metadata() | |
return metadata.get(task_id) | |
if __name__ == "__main__": | |
main() |