import os import gradio as gr import requests import inspect import pandas as pd from typing import TypedDict, Annotated, List from langchain_core.messages import BaseMessage, ToolMessage, AIMessage, HumanMessage, SystemMessage import operator from langchain_openai import ChatOpenAI from langgraph.graph import StateGraph, END from web_search_tool import duckduckgo_search as web_search import re import os from retriever import EnhancedGAIAInfoRetrieverTool # os.system("chmod +x ./setup.sh") # os.system("./setup.sh") import warnings warnings.filterwarnings("ignore", message="This package.*duckduckgo_search.*") # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" class AgentState(TypedDict): messages: Annotated[List[BaseMessage], operator.add] # You might also want to store retrieved documents explicitly if needed later retrieved_docs: List[str] llm = ChatOpenAI( model="gpt-3.5-turbo", api_key=os.getenv("OPENAI_API_KEY") ) gaia_tool = EnhancedGAIAInfoRetrieverTool() tools = [gaia_tool, web_search] # Bind the tools to the LLM llm_with_tools = llm.bind_tools(tools) def agent_node(state: AgentState): messages = state["messages"] # Add system prompt if first message is human if len(messages) == 1 and isinstance(messages[0], HumanMessage): system_prompt = SystemMessage(content="""You are a factual assistant. Rules: 1. First try to use GAIA retriever for structured data 2. If GAIA fails or doesn't have answer, use web search 3. Never guess - use tools when unsure 4. For questions about images/videos, use web search""") messages.insert(0, system_prompt) response = llm_with_tools.invoke(messages) messages.append(response) return {"messages": messages} def retrieve_node(state: AgentState): try: last_message = state["messages"][-1] if hasattr(last_message, "tool_calls"): for tool_call in last_message.tool_calls: if "retriev" in tool_call["name"].lower(): query = tool_call["args"].get("query", "") output = gaia_tool.invoke(query) state["messages"].append(ToolMessage( content=str(output), tool_call_id=tool_call["id"] )) return state except Exception as e: state["messages"].append(AIMessage( content=f"Retrieval failed: {str(e)}. Trying web search...", tool_calls=[{ "name": "duckduckgo_search", "args": {"query": query}, "id": "fallback_search" }] )) return state def web_search_node(state: AgentState): search_results = state.get("retrieved_docs", []) last_message = state["messages"][-1] # Check if the last message has tool_calls and they exist if hasattr(last_message, "tool_calls") and last_message.tool_calls: for tool_call in last_message.tool_calls: name = tool_call["name"].lower() if "search" in name or "duckduckgo" in name or "web_search" in name: try: args = tool_call["args"] query = args.get("query") or args.get("q") or "" # Web search snippet extraction # output = web_search.invoke(query) output = web_search.invoke(query) if isinstance(output, dict): # formatted = "\n".join( # f'{r["title"]}\n{r["url"]}\n{r["snippet"]}' # for r in output.get("web_results", []) # ) formatted = "\n".join(r["snippet"] for r in output.get("web_results", [])[:3]) output = formatted or str(output) # retrieved_docs.append(formatted) search_results.append(str(output)) state["messages"].append(ToolMessage(content=str(output), tool_call_id=tool_call["id"])) except Exception as e: state["messages"].append(ToolMessage(content=f"Web search error: {e}", tool_call_id=tool_call["id"])) state["retrieved_docs"] = search_results return state def generate_node(state: AgentState): """Generates final answer from retrieved context and processed data.""" messages = state["messages"] last_message = messages[-1] # If we have a tool message, process it to extract answer if isinstance(last_message, ToolMessage): content = last_message.content # Try to extract answer from GAIA format if "Answer:" in content: answer = content.split("Answer:")[-1].strip() # Try to extract answer from web search results elif "title" in content and "snippet" in content: try: # Parse web results and let LLM summarize response = llm.invoke([ SystemMessage(content="Extract the most relevant answer from these search results."), HumanMessage(content=content) ]) answer = response.content except: answer = content.split("\n")[0] # Fallback to first result else: answer = content # Default to raw content return {"messages": [AIMessage(content=answer)]} # If we have no answer, try web search as fallback if not last_message.content.strip() or "don't know" in last_message.content.lower(): original_question = next((m.content for m in messages if isinstance(m, HumanMessage)), "") return { "messages": [ AIMessage( content="Couldn't find answer, trying web search...", tool_calls=[{ "name": "duckduckgo_search", "args": {"query": original_question}, "id": "fallback_search" }] ) ] } return {"messages": [last_message]} def validate_answer_node(state: AgentState) -> dict: """Validate if answer is satisfactory and return next action""" last_message = state["messages"][-1] # If we have an AIMessage with content, accept it if isinstance(last_message, AIMessage) and last_message.content.strip(): # Reject if answer contains error markers if any(marker in last_message.content.lower() for marker in ["error", "failed", "don't know"]): return {"action": "needs_web_search"} return {"action": "acceptable"} # Count how many times we've tried each method retrieval_attempts = sum(1 for m in state["messages"] if isinstance(m, ToolMessage) and "retriev" in str(m.content).lower()) search_attempts = sum(1 for m in state["messages"] if isinstance(m, ToolMessage) and "search" in str(m.content).lower()) # If we've tried both methods and still no answer, give up if retrieval_attempts >= 1 and search_attempts >= 1: return {"action": "acceptable"} # Accept whatever we have # Default to trying retrieval first, then search if retrieval_attempts == 0: return {"action": "needs_retrieval"} else: return {"action": "needs_web_search"} def should_continue(state: AgentState): last_message = state["messages"][-1] # If we have a good answer already, generate it if isinstance(last_message, AIMessage) and last_message.content.strip(): return "generate" # If last message is a tool message, generate response if isinstance(last_message, ToolMessage): return "generate" # If last message has tool calls, route appropriately if hasattr(last_message, "tool_calls"): for tool_call in last_message.tool_calls: name = tool_call["name"].lower() if "retriev" in name or "gaia" in name: return "retrieve" if "search" in name or "duckduckgo" in name: return "web_search" # Default to retrieval first return "retrieve" def should_end(state: AgentState): if state["messages"] and state["messages"][-1].content: return "end" return "continue" # Create the graph graph = StateGraph(AgentState) # Add nodes graph.add_node("agent", agent_node) graph.add_node("retrieve", retrieve_node) graph.add_node("web_search", web_search_node) graph.add_node("generate", generate_node) graph.add_node("validate", validate_answer_node) # Validation node # Set entry point graph.set_entry_point("agent") # Add conditional edges from agent graph.add_conditional_edges( "agent", should_continue, { "retrieve": "retrieve", "web_search": "web_search", "generate": "generate", } ) # Connect tool nodes to generate graph.add_edge("retrieve", "generate") graph.add_edge("web_search", "generate") # Add conditional edges from generate to validate or end graph.add_conditional_edges( "generate", lambda state: validate_answer_node(state)["action"], # Fixed: Use the node's output { "acceptable": END, "needs_web_search": "web_search", "needs_retrieval": "retrieve", } ) # Compile the graph runnable = graph.compile() class MyAgent: def __init__(self): self.runnable = runnable def __call__(self, question: str) -> str: # Handle empty or nonsense questions if not question.strip() or question.strip() in ["", "?"]: return "[NO QUESTION PROVIDED]" # Handle reversed text questions if " " not in question and len(question) > 10 and question == question[::-1]: return question[::-1] result = self.runnable.invoke({ "messages": [HumanMessage(content=question)], "retrieved_docs": [], }) final_answer = result["messages"][-1].content final_answer = re.sub(r"^(FINAL ANSWER|Answer|The answer is)[:\s]*", "", final_answer, flags=re.I) final_answer = final_answer.split("\n")[0].strip() for prefix in ["FINAL ANSWER:", "Answer:", "The answer is:"]: if prefix in final_answer: final_answer = final_answer.split(prefix)[-1].strip() return final_answer.strip() def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username= f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent ( modify this part to create your agent) try: agent = MyAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run your Agent with better error handling results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: continue try: # Get clean answer without any prefixes submitted_answer = agent(question_text).strip() # Ensure we don't submit empty answers if not submitted_answer: submitted_answer = "[NO ANSWER PROVIDED]" answers_payload.append({ "task_id": task_id, "submitted_answer": submitted_answer }) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer }) except Exception as e: error_msg = f"AGENT ERROR: {str(e)}" answers_payload.append({ "task_id": task_id, "submitted_answer": error_msg }) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": error_msg }) # 4. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# Basic Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. --- **Disclaimers:** Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) # Removed max_rows=10 from DataFrame constructor results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) # for testing i need to remove it before submission def test_with_real_questions(): api_url = "https://agents-course-unit4-scoring.hf.space/questions" try: response = requests.get(api_url) response.raise_for_status() questions = response.json() for item in questions[:5]: # Test first 5 questions answer = agent(item['question']) print(f"Task ID: {item['task_id']}") print(f"Q: {item['question']}") print(f"A: {answer}\n") except Exception as e: print(f"Error fetching test questions: {e}") if __name__ == "__main__": # Quick test agent = MyAgent() test_answer = agent("What is the capital of France?") print(test_answer) # Should return just "Paris" (no prefixes) test_questions = [ "What is the population of Tokyo as of 2023?", "Who won the Nobel Prize in Physics in 2022?", "What is the chemical formula for aspirin?", "How many countries are members of the European Union?", "What is the tallest mountain in Africa?", "Who wrote the novel '1984'?", "What is the distance between Earth and Mars at their closest approach?", "When was the first iPhone released?", "What is the currency of Switzerland?", "Name the current CEO of Microsoft" ] for question in test_questions: answer = agent(question) print(f"Q: {question}\nA: {answer}\n") print("************************************************************************") test_with_real_questions() print("************************************************************************") print("\n" + "-"*30 + " App Starting " + "-"*30) # Check for SPACE_HOST and SPACE_ID at startup for information space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: # Print repo URLs if SPACE_ID is found print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for Basic Agent Evaluation...") demo.launch(debug=True, share=False)