import os import gradio as gr import requests import inspect import pandas as pd from dotenv import load_dotenv from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, HfApiModel from tools import ( ReverseTextTool, ExtractTextFromImageTool, AnalyzeCSVTool, AnalyzeExcelTool, DateCalculatorTool, DownloadFileTool ) # Load environment variables try: load_dotenv() print("Environment variables are loaded from .env file") except Exception as e: print(f"Could not load .env file - {e}") # (Keep Constants as is) # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Basic Agent Definition --- # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ class GAIAAgent: def __init__(self, verbose=False): self.verbose = verbose print("Initializing Agent...") # Get API Key api_key = os.environ.get("HF_API_KEY") if not api_key: raise ValueError("HF API key not found. Please set the HF_API_KEY variable.") # Initialize model with gpt-4o-mini model_id = os.environ.get("HF_MODEL_ID", "Qwen/Qwen3-32B") print(f"Using HF model: {model_id}") model = HfApiModel( model_id=model_id, api_key=api_key, temperature=0.6 ) # Initializing tools search_tool = DuckDuckGoSearchTool() self.tools = [search_tool, ReverseTextTool(), ExtractTextFromImageTool(), AnalyzeCSVTool(), AnalyzeExcelTool(), DateCalculatorTool(), DownloadFileTool()] # Authorised imports authorised_imports = ["PyPDF2", "pdf2image", "pillow", "nltk", "sklearn", "networkx", "matplotlib", "seaborn", "scipy", "time"] self.agent = CodeAgent( tools=self.tools, model=model, add_base_tools=True, planning_interval=3, verbosity_level=2 if self.verbose else 0, additional_authorized_imports=authorised_imports ) print("Agent ready to Go!") def _is_reversed_text(self, text): """Check if the text appears to be reversed""" return(text.startswith(".") or ".rewsna eht sa" in text or "esrever" in text or "sdrawkcab" in text) def __call__(self, question: str) -> str: """Process a question and return the answer""" if self.verbose: print(f"Processing question: {question[:100]}." if len(question) > 100 else f"Processing question: {question}") if self._is_reversed_text(question): if self.verbose: print("Detected reversed text, it will be hadle accordingly") prompt = f""" You are a general AI Assistant. Your purpose is to answer question. This question appears to be in reversed text. Here is the reversed version for clarity: {question[::-1]} Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. - If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. - If you are asked for a string, don't use articles, neither abbreviations(e.g. for cites), and write the digits in plain text unless specified otherwise. - If you are asked for a comma separated list, apply the above rules depending of whether the element to be put on the list is a number or a string. IMPORTANT NOTES TO LIMIT COSTS AND PREVENT ERRORS: - Use web search sparingly and only when absolutely necessary. - Limit to 1-2 web searches per question. - If the search fails due to rate limiting, add a 3-5 second delay using time.sleep() before retrying with a different search term. - Do not import libraries that aren't available - stick to basic Python and the tools provided. - Focus on answering directly with what you already know when possible. - If you have made more than 3 attempts to solve a problem, prioritize providing your best guess. - Always add a delay of 2-3 seconds between web searches using time.sleep() to avoid rate limiting. Remember to structure your response in Python code format using the final_answer() function. """ else: prompt = f""" You are a general AI Assistant. Your purpose is to answer question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. - If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. - If you are asked for a string, don't use articles, neither abbreviations(e.g. for cites), and write the digits in plain text unless specified otherwise. - If you are asked for a comma separated list, apply the above rules depending of whether the element to be put on the list is a number or a string. Question: {question} IMPORTANT NOTES TO LIMIT COSTS AND PREVENT ERRORS: - Use web search sparingly and only when absolutely necessary. - Limit to 1-2 web searches per question. - If the search fails due to rate limiting, add a 3-5 second delay using time.sleep() before retrying with a different search term. - Do not import libraries that aren't available - stick to basic Python and the tools provided. - Focus on answering directly with what you already know when possible. - If you have made more than 3 attempts to solve a problem, prioritize providing your best guess. - Always add a delay of 2-3 seconds between web searches using time.sleep() to avoid rate limiting. Remember to structure your response in Python code format using the final_answer() function. """ try: answer = self.agent.run(prompt) if self.verbose: print(f"Generated answer: {answer}") return answer except Exception as e: error_msg = f"Error processing question: {e}" if self.verbose: print(error_msg) return error_msg def run_and_submit_all( profile: gr.OAuthProfile | None): """ Fetches all questions, runs the Agent on them, submits all answers, and displays the results. Args: sample_size: Number of questions to process (0 for all questions) """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username= f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent ( modify this part to create your agent) try: agent = GAIAAgent(verbose=True) except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(f"Agent code URL: {agent_code}") # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run your Agent results_log = [] answers_payload = [] # Limit number of questions if sample_size is specified # if sample_size > 0 and sample_size < len(questions_data): # import random # print(f"Using a sample of {sample_size} questions from {len(questions_data)} total questions") # questions_data = random.sample(questions_data, sample_size) print(f"Running agent on {len(questions_data)} questions...") for i, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: print(f"Processing question {i+1}/{len(questions_data)}: Task ID {task_id}") submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) print(f"Successfully processed question {i+1}") # Delays next question to avoid rate limiting if i< len(questions_data) - 1: import time print("Waiting 5 seconds before next question:)") time.sleep(5) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df def test_single_question(question: str) -> str: """Test the agent on a single question""" try: agent = GAIAAgent(verbose=True) answer = agent(question) return answer except Exception as e: return f"Error: {e}" # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# Agent Evaluation Runner") gr.Markdown( """ ## Instructions: 1. Log in to your Hugging Face account using the button below. 2. Test your agent on individual questions in the Testing Tab. 3. Run the Evaluation on the GAIA benchmark in teh Evaluation Tab. This agent is designed to achieve a score of at least 30% on teh GAIA Benchmark. --- ## Disclaimers: Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). """ ) gr.LoginButton() with gr.Tab("Test for a single question"): test_input = gr.Textbox(label="Enter a question", lines=3) test_output = gr.Textbox(label="Answer", lines=5) test_button = gr.Button("Run Test") test_button.click( fn=test_single_question, inputs = test_input, outputs=test_output ) with gr.Tab("Final Evaluation"): with gr.Row(): sample_size = gr.Slider( minimum=0, maximum=20, value=0, step=1, label="Sample Size (0 for all questions)", info="Set a number to limit how many questions to process (reduces costs)" ) run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) # Removed max_rows=10 from DataFrame constructor results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-"*30 + " App Starting " + "-"*30) # Check for API key api_key = os.environ.get("HF_API_KEY") if not api_key: print("WARNING: HF API key is not found. Please set HF_API_KEY environment variable.") else: print("OpenAI API key was found.") # Check for SPACE_HOST and SPACE_ID at startup for information space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: # Print repo URLs if SPACE_ID is found print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for Agent Evaluation...") demo.launch(debug=True)