import os import gradio as gr import requests import pandas as pd import yaml from smolagents import CodeAgent, LiteLLMModel, DuckDuckGoSearchTool, WikipediaSearchTool from datasets import load_dataset from cache_manager import CacheManager from tools.final_answer import final_answer from tools.get_file import get_file from tools.web_scraping import ( scrape_webpage_content, extract_links_from_webpage, get_webpage_metadata ) # Load the GAIA dataset dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", trust_remote_code=True, cache_dir="GAIA") print("GAIA dataset loaded successfully.") # Initialize cache manager cache_manager = CacheManager() # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- QA Agent Definition --- class QAAgent: def __init__(self, temperature=None, max_tokens=None, max_steps=None): """ Initialize the QA Agent with configuration from config.yaml. Args: temperature: Temperature for text generation (overrides config) max_tokens: Maximum number of tokens for the model (overrides config) max_steps: Maximum number of steps the agent can take (overrides config) """ print("Initializing QA Agent with configuration...") try: # Load configuration config = self._load_config() # Load prompts prompts = self._load_prompts() # Get model configuration with overrides model_config = config.get('model', {}) model_id = model_config.get('model_id', 'anthropic/claude-sonnet-4-20250514') temp = temperature if temperature is not None else model_config.get('temperature', 0.2) max_tok = max_tokens if max_tokens is not None else model_config.get('max_tokens', 2096) # Get agent configuration with overrides agent_config = config.get('agent', {}) self.max_steps = max_steps if max_steps is not None else agent_config.get('max_steps', 5) print(f"Model: {model_id}") print(f"Temperature: {temp}") print(f"Max tokens: {max_tok}") print(f"Max steps: {self.max_steps}") # Prepare model initialization parameters model_params = { 'model_id': model_id, 'temperature': temp, 'max_tokens': max_tok } # Add Vertex AI specific parameters if using a vertex_ai model if model_id.startswith('vertex_ai/'): print("Configuring Vertex AI parameters...") vertex_config = config.get('vertex_ai', {}) # Add vertex project if specified if 'vertex_project' in vertex_config and vertex_config['vertex_project'] != 'your-gcp-project-id': model_params['vertex_project'] = vertex_config['vertex_project'] print(f" Vertex Project: {vertex_config['vertex_project']}") # Add vertex location if specified if 'vertex_location' in vertex_config: model_params['vertex_location'] = vertex_config['vertex_location'] print(f" Vertex Location: {vertex_config['vertex_location']}") # Add vertex credentials if specified and valid creds_path = vertex_config.get('vertex_credentials') if creds_path and creds_path not in ['/path/to/service-account.json', './google.json']: if os.path.exists(creds_path): try: # Validate it's a proper JSON file import json with open(creds_path, 'r') as f: json.load(f) model_params['vertex_credentials'] = creds_path print(f" Vertex Credentials: {creds_path}") except (json.JSONDecodeError, Exception) as e: print(f" Warning: Invalid credentials file {creds_path}: {e}") else: print(f" Warning: Credentials file not found: {creds_path}") # Add safety settings if specified if 'safety_settings' in vertex_config: model_params['safety_settings'] = vertex_config['safety_settings'] print(f" Safety Settings: {len(vertex_config['safety_settings'])} categories configured") # Initialize the LiteLLM model model = LiteLLMModel(**model_params) # Available tools for the agent tools = [ DuckDuckGoSearchTool(), WikipediaSearchTool(), get_file, scrape_webpage_content, extract_links_from_webpage, get_webpage_metadata, final_answer ] # Create the agent without prompt_templates (they'll be used in question processing) self.agent = CodeAgent( tools=tools, model=model, max_steps=self.max_steps ) # Store prompts for use in question processing self.prompts = prompts print("Agent initialized successfully!") except Exception as e: # Provide helpful error messages based on the model type error_msg = f"Error initializing QA Agent: {e}" if "authentication" in str(e).lower() or "api" in str(e).lower() or "credentials" in str(e).lower(): if hasattr(self, '_load_config'): config = self._load_config() model_id = config.get('model', {}).get('model_id', '') if "vertex_ai" in model_id.lower() or "gemini" in model_id.lower(): error_msg += "\n\nFor Vertex AI models, please:" error_msg += "\n1. Set up authentication:" error_msg += "\n Option A: gcloud auth application-default login" error_msg += "\n Option B: export GOOGLE_APPLICATION_CREDENTIALS='/path/to/service-account.json'" error_msg += "\n Option C: Set vertex_credentials in config.yaml" error_msg += "\n2. Update config.yaml with your:" error_msg += "\n - vertex_project: 'your-gcp-project-id'" error_msg += "\n - vertex_location: 'us-central1' (or your preferred region)" elif "anthropic" in model_id.lower(): error_msg += "\n\nFor Anthropic models, please set: export ANTHROPIC_API_KEY='your-key-here'" elif "openai" in model_id.lower() or "gpt" in model_id.lower(): error_msg += "\n\nFor OpenAI models, please set: export OPENAI_API_KEY='your-key-here'" print(error_msg) raise Exception(error_msg) def _load_config(self): """Load configuration from config.yaml""" try: with open('config.yaml', 'r') as f: return yaml.safe_load(f) except FileNotFoundError: print("Warning: config.yaml not found, using default configuration") return {} except Exception as e: print(f"Error loading config.yaml: {e}") return {} def _load_prompts(self): """Load prompts from prompts.yaml""" try: with open('prompts.yaml', 'r') as f: return yaml.safe_load(f) except FileNotFoundError: print("Warning: prompts.yaml not found, using default prompts") return {} except Exception as e: print(f"Error loading prompts.yaml: {e}") return {} def __call__(self, question: str) -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") try: # Get system prompt from loaded prompts and combine with question system_prompt = self.prompts.get('system_prompt', '') if system_prompt: enhanced_question = f"{system_prompt}\n\n{question}" else: enhanced_question = question # Use the agent to run and answer the enhanced question answer = self.agent.run(enhanced_question) print(f"Agent returning answer (first 100 chars): {str(answer)[:100]}...") return str(answer) except Exception as e: print(f"Error running agent: {e}") return f"Error processing question: {e}" def run_questions(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the QAAgent on them, and caches the answers. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" # 1. Instantiate Agent try: agent = QAAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run Agent and Cache Results results_log = [] cached_count = 0 processed_count = 0 print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") file_name = item.get("file_name") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue # Check if answer is already cached cached_result = cache_manager.get_cached_answer(question_text) if cached_result and cached_result.get('cache_valid', False): print(f"Using cached answer for task {task_id}") submitted_answer = cached_result['answer'] cached_count += 1 results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "Status": "Cached" }) else: # Run agent and cache the result try: print(f"Processing task {task_id} with agent...") # Enhance question with file information if file is present enhanced_question = question_text if file_name: enhanced_question = f"{question_text}\n\nNote: This question references a file named '{file_name}'. Use the get_file tool to retrieve its content." submitted_answer = agent(enhanced_question) # Cache the answer cache_success = cache_manager.cache_answer( question=question_text, answer=submitted_answer, iterations=1, file_name=file_name ) processed_count += 1 status = "Processed & Cached" if cache_success else "Processed (Cache Failed)" results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "Status": status }) except Exception as e: print(f"Error running agent on task {task_id}: {e}") error_answer = f"AGENT ERROR: {e}" # Cache the error (will be marked as invalid) cache_manager.cache_answer( question=question_text, answer=error_answer, iterations=1, file_name=file_name ) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": error_answer, "Status": "Error" }) status_message = ( f"Questions processing completed!\n" f"Total questions: {len(questions_data)}\n" f"Used cached answers: {cached_count}\n" f"Newly processed: {processed_count}\n" f"Answers are cached and ready for submission." ) print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df def submit_answers(profile: gr.OAuthProfile | None): """ Loads cached answers and submits them to the evaluation server. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # In the case of an app running as a Hugging Face space, this link points toward your codebase agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 1. Fetch Questions to get task_ids print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None # 2. Load Cached Answers answers_payload = [] results_log = [] missing_answers = [] for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue # Try to get cached answer cached_result = cache_manager.get_cached_answer(question_text) if cached_result and cached_result.get('cache_valid', False): submitted_answer = cached_result['answer'] answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer, "Status": "Ready for Submission" }) else: missing_answers.append(task_id) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": "NO CACHED ANSWER", "Status": "Missing Answer" }) if missing_answers: status_message = ( f"Cannot submit: Missing cached answers for {len(missing_answers)} questions.\n" f"Missing task IDs: {missing_answers[:5]}{'...' if len(missing_answers) > 5 else ''}\n" f"Please run the questions first to generate and cache answers." ) print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df if not answers_payload: print("No valid cached answers found for submission.") return "No valid cached answers found for submission.", pd.DataFrame(results_log) # 3. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Submitting {len(answers_payload)} cached answers for user '{username}'..." print(status_update) # 4. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df def clear_cache(): """Clear all cached answers.""" cache_manager.clear_cache() return "Cache cleared successfully!", pd.DataFrame() # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# QA Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. 3. Click 'Run Questions' to fetch questions and run your agent (answers will be cached). 4. Click 'Submit Answers' to submit the cached answers and see your score. 5. Use 'Clear Cache' to remove all cached answers if needed. --- **Benefits of Separate Run/Submit:** - Answers are cached, so you can run questions once and submit multiple times - Faster submission since answers are pre-computed - Better error handling and recovery - Ability to review answers before submission """ ) gr.LoginButton() with gr.Row(): run_button = gr.Button("Run Questions", variant="primary") submit_button = gr.Button("Submit Answers", variant="secondary") clear_button = gr.Button("Clear Cache", variant="stop") status_output = gr.Textbox(label="Status / Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_questions, outputs=[status_output, results_table] ) submit_button.click( fn=submit_answers, outputs=[status_output, results_table] ) clear_button.click( fn=clear_cache, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-"*30 + " App Starting " + "-"*30) # Check for SPACE_HOST and SPACE_ID at startup for information space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: # Print repo URLs if SPACE_ID is found print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for QA Agent Evaluation...") demo.launch(debug=True, share=False)