File size: 14,619 Bytes
2cbacd8 0fe12f3 3413fb8 2cbacd8 3413fb8 5065308 3413fb8 2cbacd8 5065308 2cbacd8 0fe12f3 3413fb8 2cbacd8 3413fb8 2cbacd8 0fe12f3 5065308 2cbacd8 3413fb8 2cbacd8 3413fb8 2cbacd8 0fe12f3 2cbacd8 0fe12f3 2cbacd8 0fe12f3 3413fb8 5065308 3413fb8 5065308 3413fb8 0fe12f3 2cbacd8 3413fb8 2cbacd8 3413fb8 0fe12f3 5065308 2cbacd8 0fe12f3 3413fb8 2cbacd8 c75fc46 3413fb8 c75fc46 2cbacd8 3413fb8 c75fc46 3413fb8 2cbacd8 c75fc46 2cbacd8 35d7582 0fe12f3 c75fc46 3413fb8 c75fc46 0fe12f3 c75fc46 0fe12f3 c75fc46 2cbacd8 c75fc46 0fe12f3 2cbacd8 0fe12f3 3413fb8 2cbacd8 0fe12f3 c75fc46 0fe12f3 c75fc46 0fe12f3 2cbacd8 0fe12f3 2cbacd8 c75fc46 2cbacd8 c75fc46 2cbacd8 1a4236f 2cbacd8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
import os
import gradio as gr
import requests
import pandas as pd
import traceback
import time
import mimetypes
from tempfile import NamedTemporaryFile
# Import smol-agent and tool components
from smolagents import CodeAgent, LiteLLMModel, tool
from smolagents import DuckDuckGoSearchTool
from unstructured.partition.auto import partition
# Imports for advanced file processing
import speech_recognition as sr
from pydub import AudioSegment
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Tool Definition (Upgraded for Full Multimodality with pydub) ---
@tool
def file_reader(file_path: str) -> str:
"""
Reads and analyzes the content of a file and returns relevant text-based information.
Supports:
- Text files (PDF, TXT, CSV)
- Images (PNG, JPG) with OCR
- Audio (MP3, WAV) via speech recognition
- Video (MP4, MOV) via speech recognition on audio track
Can be used with a local file path or a web URL.
Args:
file_path (str): The local path or web URL of the file to be read.
Returns:
str: Extracted or transcribed content as text.
"""
temp_file_path = None
audio_temp_path = None
try:
# Download the file if it's a URL
if file_path.startswith("http://") or file_path.startswith("https://"):
temp_file_path = NamedTemporaryFile(delete=False).name
response = requests.get(file_path, timeout=20)
response.raise_for_status()
with open(temp_file_path, "wb") as f:
f.write(response.content)
local_path = temp_file_path
else:
local_path = file_path
mime_type, _ = mimetypes.guess_type(local_path)
recognizer = sr.Recognizer()
if mime_type:
# Handle audio files
if mime_type.startswith("audio/"):
with sr.AudioFile(local_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_whisper(audio)
# Handle video files by extracting audio with pydub
elif mime_type.startswith("video/"):
with NamedTemporaryFile(suffix=".wav", delete=False) as audio_temp:
audio_temp_path = audio_temp.name
# Extract audio using pydub
video_audio = AudioSegment.from_file(local_path, format=mime_type.split('/')[1])
video_audio.export(audio_temp_path, format="wav")
with sr.AudioFile(audio_temp_path) as source:
audio = recognizer.record(source)
return recognizer.recognize_whisper(audio)
# Default to handling text and images with OCR if not audio/video
elements = partition(local_path)
return "\n\n".join([str(el) for el in elements])
except Exception as e:
return f"Error reading or processing file '{file_path}': {e}"
finally:
# Clean up the downloaded file if it exists
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
# Clean up the temporary audio file
if audio_temp_path and os.path.exists(audio_temp_path):
os.remove(audio_temp_path)
# --- Agent Class (Updated with More Powerful Model and Tools) ---
class GaiaSmolAgent:
def __init__(self):
"""
Initializes the optimized agent.
Now uses a more powerful model and the agent's native conversation memory.
"""
print("Initializing Optimized GaiaSmolAgent...")
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("API key 'GEMINI_API_KEY' not found in environment secrets.")
# Use a more powerful, "clever" model for better reasoning.
model = LiteLLMModel(
model_id="gemini/gemini-1.5-pro-latest",
api_key=api_key,
temperature=0.0,
timeout=120.0, # Add a timeout to prevent hanging
)
# Store the sophisticated system prompt as an instance variable.
self.system_prompt = """
You are an expert-level research assistant AI. Your sole purpose is to answer the user's question by breaking it down into logical steps and using the provided tools. You will have access to the conversation history, so use it for context.
**Available Tools:**
- `duck_duck_go_search(query: str) -> str`: Use this to find information, file URLs, or anything on the web.
- `file_reader(file_path: str) -> str`: Use this to read the contents of a file from a local path or a web URL. It can read text, extract text from images (OCR), and transcribe audio from audio/video files.
**Your Thought Process:**
1. **Deconstruct the Goal:** Carefully analyze the question to understand what information is needed, considering the previous turns in the conversation.
2. **Formulate a Plan:** Think step-by-step about which tools to use in what order. For example, you might need to search for a URL first, then read the content of that URL.
3. **Execute & Analyze:** Call the necessary tools. Carefully examine the output of each tool to extract the required facts. You can write Python code to process the data returned by the tools.
4. **Synthesize the Answer:** Once you have gathered sufficient information, formulate a final, concise answer to the original question.
**CRITICAL INSTRUCTIONS:**
- Your final action MUST be a single call to the `final_answer(answer: str)` function.
- The `answer` argument must be a string containing only the definitive answer.
- All code you write is executed in a restricted Python environment. You can define variables and write logic to process the tool outputs before calling `final_answer`.
- Do not ask for clarification. Directly proceed to solve the problem.
"""
# Initialize the agent with the updated file_reader tool and memory settings.
self.agent = CodeAgent(
model=model,
tools=[file_reader, DuckDuckGoSearchTool()],
add_base_tools=True, # Provides the python interpreter and the final_answer function
planning_interval=3 # Re-plan every 3 steps, considering memory.
)
print("Optimized GaiaSmolAgent initialized successfully with native memory and full multimodal capabilities.")
def __call__(self, question: str, reset_memory: bool = False) -> str:
"""
Directly runs the agent to generate and execute a plan to answer the question.
It leverages the agent's built-in memory, controlled by the `reset` parameter.
Args:
question (str): The user's question.
reset_memory (bool): If True, the agent's conversation memory will be cleared
before running. Maps to the agent's `reset` parameter.
"""
print(f"Optimized Agent received question: {question[:100]}...")
try:
# Combine the system prompt with the current question. The agent will handle the history.
full_prompt = f"{self.system_prompt}\n\nCURRENT TASK:\nUser Question: \"{question}\""
# Use the agent's `reset` parameter to control conversation memory.
# `reset=False` keeps the memory from previous calls.
final_answer = self.agent.run(full_prompt, reset=reset_memory)
except Exception as e:
print(f"FATAL AGENT ERROR: An exception occurred during agent execution: {e}")
print(traceback.format_exc()) # Print full traceback for easier debugging
return f"FATAL AGENT ERROR: {e}"
print(f"Optimized Agent returning final answer: {final_answer}")
return str(final_answer)
# --- Main Application Logic (Unchanged) ---
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = GaiaSmolAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Gradio Interface (Updated Instructions) ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner (smol-agent)")
gr.Markdown(
"""
**Instructions:**
1. Ensure you have added your **GEMINI API key** (as `GEMINI_API_KEY`) in the Space's secrets.
2. Log in to your Hugging Face account using the button below.
3. Click 'Run Evaluation & Submit All Answers' to run your agent and see the score.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("Launching Gradio Interface for GAIA Agent Evaluation...")
demo.launch(debug=True, share=False)
|