ATK20's picture
Update app.py
1d77c90 verified
raw
history blame
21.7 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import re
import json
import math
import time
from typing import Dict, Any, List, Optional, Union
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Tool Definitions ---
class Tools:
@staticmethod
def calculator(expression: str) -> Union[float, str]:
"""Safely evaluate mathematical expressions"""
# Clean the expression to only contain valid math operations
try:
# Extract numbers and operators
safe_expr = re.sub(r'[^0-9+\-*/().%\s]', '', expression)
# Calculate using a safer approach than eval()
# Use a restricted namespace for evaluation
safe_globals = {"__builtins__": {}}
safe_locals = {"math": math}
# Add basic math functions
for func in ['sin', 'cos', 'tan', 'sqrt', 'log', 'exp', 'floor', 'ceil']:
safe_locals[func] = getattr(math, func)
result = eval(safe_expr, safe_globals, safe_locals)
return result
except Exception as e:
return f"Error in calculation: {str(e)}"
@staticmethod
def search(query: str) -> str:
"""Simulate a web search with predefined responses for common queries"""
# This is a mock search function - in a real scenario, you might
# use a proper search API like SerpAPI or DuckDuckGo
knowledge_base = {
"population": "The current world population is approximately 8 billion people.",
"capital of france": "The capital of France is Paris.",
"largest planet": "Jupiter is the largest planet in our solar system.",
"tallest mountain": "Mount Everest is the tallest mountain above sea level at 8,848.86 meters.",
"deepest ocean": "The Mariana Trench is the deepest ocean trench, located in the Pacific Ocean.",
"president": "The current president of the United States is Joe Biden (as of 2024).",
"water boiling point": "Water boils at 100 degrees Celsius (212 degrees Fahrenheit) at standard pressure.",
"pi": "The mathematical constant pi (π) is approximately 3.14159.",
"speed of light": "The speed of light in vacuum is approximately 299,792,458 meters per second.",
"human body temperature": "Normal human body temperature is around 37 degrees Celsius (98.6 degrees Fahrenheit)."
}
# Try to find a relevant answer in our knowledge base
for key, value in knowledge_base.items():
if key in query.lower():
return value
return "No relevant information found in the knowledge base."
@staticmethod
def date_info() -> str:
"""Provide the current date"""
return time.strftime("%Y-%m-%d")
# --- LLM Interface ---
class LLMInterface:
@staticmethod
def query_llm(prompt: str) -> str:
"""Query a free LLM through Hugging Face's inference API"""
try:
# Using a smaller, more reliable free model
API_URL = "https://api-inference.huggingface.co/models/facebook/bart-large-cnn"
# Alternative models you can try if this one doesn't work:
# - "distilbert-base-uncased-finetuned-sst-2-english"
# - "gpt2"
# - "microsoft/DialoGPT-medium"
headers = {"Content-Type": "application/json"}
# Use a well-formatted prompt
payload = {
"inputs": prompt,
"parameters": {"max_length": 100, "do_sample": False}
}
response = requests.post(API_URL, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
result = response.json()
# Handle different response formats
if isinstance(result, list) and len(result) > 0:
return result[0].get("generated_text", "").strip()
elif isinstance(result, dict):
return result.get("generated_text", "").strip()
else:
return str(result).strip()
elif response.status_code == 503:
# Model is loading
return "I need more time to think about this. The model is currently loading."
else:
# Fallback for other API issues
return "I don't have enough information to answer that question precisely."
except requests.exceptions.Timeout:
return "The model is taking too long to respond. Let me give a simpler answer instead."
except Exception as e:
# More robust fallback system with common answers
common_answers = {
"population": "The current world population is approximately 8 billion people.",
"capital": "I can tell you about many capitals. For example, Paris is the capital of France.",
"math": "I can help with mathematical calculations.",
"weather": "I don't have access to current weather information.",
"date": "I can tell you that a day has 24 hours.",
"time": "I can't check the current time."
}
# Check if any keywords match
for keyword, answer in common_answers.items():
if keyword in prompt.lower():
return answer
return "I'm sorry, I couldn't process that request properly. Please try asking in a simpler way."
# --- Advanced Agent Implementation ---
class BasicAgent:
def __init__(self):
print("Advanced Agent initialized.")
self.tools = {
"calculator": Tools.calculator,
"search": Tools.search,
"date": Tools.date_info
}
self.llm = LLMInterface()
def __call__(self, question: str) -> str:
print(f"Agent received question: {question[:50]}...")
# Step 1: Analyze the question
tool_needed, tool_name = self._analyze_question(question)
# Step 2: Use appropriate tool or direct answer
if tool_needed:
if tool_name == "calculator":
# Extract the math expression from the question
expression = self._extract_math_expression(question)
if expression:
result = self.tools["calculator"](expression)
# Format numerical answers appropriately
if isinstance(result, (int, float)):
if result == int(result):
answer = str(int(result)) # Remove decimal for whole numbers
else:
answer = str(result) # Keep decimal for fractions
else:
answer = str(result)
else:
answer = "Unable to extract a mathematical expression from the question."
elif tool_name == "search":
result = self.tools["search"](question)
answer = self._extract_direct_answer(question, result)
elif tool_name == "date":
result = self.tools["date"]()
answer = result
else:
# Use LLM for other types of questions
answer = self._get_answer_from_llm(question)
else:
# Direct answer for simpler questions
answer = self._get_answer_from_llm(question)
print(f"Agent returning answer: {answer[:50]}...")
return answer
def _analyze_question(self, question: str) -> tuple:
"""Determine if the question requires a tool and which one"""
# Check for mathematical questions
math_patterns = [
r'calculate', r'compute', r'what is \d+', r'how much is',
r'sum of', r'multiply', r'divide', r'subtract', r'plus', r'minus',
r'\d+\s*[\+\-\*\/\%]\s*\d+', r'squared', r'cubed', r'square root'
]
for pattern in math_patterns:
if re.search(pattern, question.lower()):
return True, "calculator"
# Check for factual questions that might need search
search_patterns = [
r'^what is', r'^who is', r'^where is', r'^when', r'^how many',
r'capital of', r'largest', r'tallest', r'population', r'president',
r'temperature', r'boiling point', r'freezing point', r'speed of'
]
for pattern in search_patterns:
if re.search(pattern, question.lower()):
return True, "search"
# Check for date-related questions
date_patterns = [r'what day is today', r'current date', r'today\'s date']
for pattern in date_patterns:
if re.search(pattern, question.lower()):
return True, "date"
# Default to direct answer
return False, None
def _extract_math_expression(self, question: str) -> str:
"""Extract a mathematical expression from the question"""
# Look for common pattern: "Calculate X" or "What is X"
patterns = [
r'calculate\s+(.*?)(?:\?|$)',
r'what is\s+(.*?)(?:\?|$)',
r'compute\s+(.*?)(?:\?|$)',
r'find\s+(.*?)(?:\?|$)',
r'how much is\s+(.*?)(?:\?|$)'
]
for pattern in patterns:
match = re.search(pattern, question.lower())
if match:
expression = match.group(1).strip()
# Further clean the expression
expression = re.sub(r'[^0-9+\-*/().%\s]', '', expression)
return expression
# If no clear pattern, attempt to extract any mathematical operation
nums_and_ops = re.findall(r'(\d+(?:\.\d+)?|\+|\-|\*|\/|\(|\)|\%)', question)
if nums_and_ops:
return ''.join(nums_and_ops)
return ""
def _extract_direct_answer(self, question: str, search_result: str) -> str:
"""Extract a concise answer from search results based on the question"""
# For simple factual questions, return the search result directly
return search_result
def _get_answer_from_llm(self, question: str) -> str:
"""Get an answer from the LLM with appropriate prompting"""
prompt = f"""
Answer the following question with a very concise, direct response:
Question: {question}
Answer in 1-2 sentences maximum, focusing only on the specific information requested.
"""
# Expanded common answers to reduce LLM API dependence
common_answers = {
"what color is the sky": "Blue.",
"how many days in a week": "7 days.",
"how many months in a year": "12 months.",
"what is the capital of france": "Paris.",
"what is the capital of japan": "Tokyo.",
"what is the capital of italy": "Rome.",
"what is the capital of germany": "Berlin.",
"what is the capital of spain": "Madrid.",
"what is the capital of united states": "Washington, D.C.",
"what is the capital of china": "Beijing.",
"what is the capital of russia": "Moscow.",
"what is the capital of canada": "Ottawa.",
"what is the capital of australia": "Canberra.",
"what is the capital of brazil": "Brasília.",
"what is water made of": "H2O (hydrogen and oxygen).",
"who wrote romeo and juliet": "William Shakespeare.",
"who painted the mona lisa": "Leonardo da Vinci.",
"what is the largest ocean": "The Pacific Ocean.",
"what is the smallest planet": "Mercury.",
"what is the largest planet": "Jupiter.",
"who invented electricity": "Electricity wasn't invented but discovered through contributions from many scientists including Benjamin Franklin, Michael Faraday, and Thomas Edison.",
"how many continents are there": "There are 7 continents: Africa, Antarctica, Asia, Europe, North America, Australia/Oceania, and South America.",
"what is the largest country": "Russia is the largest country by land area.",
"what is the most spoken language": "Mandarin Chinese is the most spoken native language in the world.",
"what is the tallest mountain": "Mount Everest is the tallest mountain above sea level at 8,848.86 meters."
}
# Clean up the question for better matching
clean_question = question.lower().strip('?').strip()
# Check if we have a hardcoded answer
if clean_question in common_answers:
return common_answers[clean_question]
# Try partial matching for more flexibility
for key, answer in common_answers.items():
if clean_question in key or key in clean_question:
# Only return if it's a close match
if len(clean_question) > len(key) * 0.7 or len(key) > len(clean_question) * 0.7:
return answer
# If no hardcoded answer, use the LLM
return self.llm.query_llm(prompt)
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent (now using our improved agent)
try:
agent = BasicAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# Advanced Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions).
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Advanced Agent Evaluation...")
demo.launch(debug=True, share=False)