| import os |
| import gradio as gr |
| import requests |
| import pandas as pd |
| import re |
| from urllib.parse import urlparse |
| from typing import TypedDict, List, Optional, Annotated, Tuple, Union, Literal |
| from langgraph.graph import StateGraph, END |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_core.messages import HumanMessage, SystemMessage, AIMessage, ToolMessage, BaseMessage |
| from langgraph.graph.message import add_messages |
| from langchain_core.tools import tool |
| from langchain_community.tools import WikipediaQueryRun |
| from langchain_community.utilities import WikipediaAPIWrapper |
| from langchain_tavily import TavilySearch |
| from pydantic import BaseModel, Field |
| from langgraph.prebuilt import ToolNode |
| from langchain_core.prompts import ChatPromptTemplate |
| import operator |
|
|
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
| TEMP_DIR_BASE = os.path.join(os.getcwd(), "temp_agent_files") |
|
|
| |
| def get_task_temp_dir(task_id: str) -> str: |
| """Creates and returns a unique temporary directory for a task.""" |
| task_dir = os.path.join(TEMP_DIR_BASE, task_id) |
| os.makedirs(task_dir, exist_ok=True) |
| return task_dir |
|
|
| def extract_youtube_id(url: str) -> Optional[str]: |
| """Extract YouTube video ID from URL.""" |
| pattern = r'(?:youtube\.com\/(?:watch\?v=|embed\/)|youtu\.be\/)([a-zA-Z0-9_-]+)' |
| match = re.search(pattern, url) |
| return match.group(1) if match else None |
|
|
| |
| @tool |
| def analyze_youtube_video(url: str, question: str) -> str: |
| """ |
| Analyze a YouTube video using Gemini 2.0 Flash Thinking. |
| |
| Args: |
| url: The YouTube video URL |
| question: Specific question about the video content |
| |
| Returns: |
| Analysis of the video based on the provided question. |
| """ |
| try: |
| parsed_url = urlparse(url) |
| if not all([parsed_url.scheme, parsed_url.netloc]): |
| return "Please provide a valid video URL with http:// or https:// prefix." |
| |
| if 'youtube.com' not in url and 'youtu.be' not in url: |
| return "Only YouTube videos are supported at this time." |
| |
| api_key = os.environ.get("GOOGLE_API_KEY") |
| if not api_key: |
| return "Unable to perform analysis: Google API key not set. Get it from https://aistudio.google.com/" |
| |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-2.5-flash", |
| google_api_key=api_key, |
| temperature=0, |
| max_output_tokens=4096 |
| ) |
| |
| prompt = f"""You are analyzing a YouTube video at URL: {url} |
| |
| Question about the video: {question} |
| |
| Based on what you know about this video (if it's a known video) or general knowledge, |
| provide a helpful analysis. If you cannot access the video directly, provide |
| reasonable information based on the video title/URL if it's recognizable. |
| |
| Analysis:""" |
| |
| response = llm.invoke(prompt) |
| return f"## YouTube Video Analysis (URL: {url})\n\n{response.content}" |
| |
| except Exception as e: |
| print(f"Error in analyze_youtube_video: {type(e).__name__}: {e}") |
| return f"Error analyzing video at {url}: {str(e)}" |
|
|
| @tool |
| def analyze_text_content(content: str, question: str) -> str: |
| """ |
| Analyze text content using Gemini. |
| |
| Args: |
| content: The text content to analyze |
| question: Specific question about the content |
| |
| Returns: |
| Analysis of the text based on the question. |
| """ |
| try: |
| api_key = os.environ.get("GOOGLE_API_KEY") |
| if not api_key: |
| return "Unable to perform analysis: Google API key not set." |
| |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-2.5-flash", |
| google_api_key=api_key, |
| temperature=0, |
| max_output_tokens=4096 |
| ) |
| |
| prompt = f"""Analyze the following content and answer the question. |
| |
| Content: {content[:8000]} |
| |
| Question: {question} |
| |
| Provide a concise, accurate answer based ONLY on the content above. |
| If the content doesn't contain the answer, say "Information not found in the provided content." |
| |
| Answer:""" |
| |
| response = llm.invoke(prompt) |
| return response.content |
| |
| except Exception as e: |
| return f"Error analyzing text: {str(e)}" |
|
|
| @tool |
| def direct_reasoning(question: str, context: str = "") -> str: |
| """ |
| Use Gemini's reasoning capabilities to answer a question. |
| |
| Args: |
| question: The question to answer |
| context: Optional context to help answer |
| |
| Returns: |
| The reasoned answer |
| """ |
| try: |
| api_key = os.environ.get("GOOGLE_API_KEY") |
| if not api_key: |
| return "Google API key not set." |
| |
| llm = ChatGoogleGenerativeAI( |
| model="gemini-2.5-flash", |
| google_api_key=api_key, |
| temperature=0, |
| max_output_tokens=4096 |
| ) |
| |
| prompt = f"""Answer the following question with ONLY the exact answer, nothing else. |
| No explanations, no "FINAL ANSWER", just the answer. |
| |
| {context} |
| Question: {question} |
| |
| Answer:""" |
| |
| response = llm.invoke(prompt) |
| return response.content.strip() |
| except Exception as e: |
| return f"Error: {str(e)}" |
|
|
| |
| class TaskState(TypedDict): |
| task_id: str |
| question: str |
| file_name: Optional[str] |
| api_url: str |
| file_path: Optional[str] |
| temp_dir: Optional[str] |
| plan: List[str] |
| past_steps: Annotated[List[Tuple[str, str]], operator.add] |
| response: str |
| messages: Annotated[list[BaseMessage], add_messages] |
| current_task: str |
|
|
| |
| def setup_tavily_search(): |
| """Set up Tavily search tool""" |
| try: |
| tavily_api_key = os.environ.get("TAVILY_API_KEY") |
| if not tavily_api_key: |
| raise ValueError("Tavily API key not found. Set TAVILY_API_KEY environment variable.") |
| print("Using Tavily for web search") |
| return TavilySearch(max_results=10) |
| except Exception as e: |
| print(f"Error setting up Tavily: {e}") |
| raise |
|
|
| |
| def get_llm(): |
| """Get Gemini LLM instance""" |
| api_key = os.environ.get("GOOGLE_API_KEY") |
| if not api_key: |
| raise ValueError("GOOGLE_API_KEY environment variable not set. Get it from https://aistudio.google.com/") |
| return ChatGoogleGenerativeAI( |
| model="gemini-2.5-flash", |
| google_api_key=api_key, |
| temperature=0, |
| max_output_tokens=4096 |
| ) |
|
|
| llm = get_llm() |
|
|
| |
| web_search = setup_tavily_search() |
| wikipedia_api = WikipediaAPIWrapper(top_k_results=8, use_https=True) |
| wikipedia_search = WikipediaQueryRun(api_wrapper=wikipedia_api) |
|
|
| tools = [ |
| analyze_youtube_video, |
| analyze_text_content, |
| direct_reasoning, |
| web_search, |
| wikipedia_search |
| ] |
|
|
| tool_node = ToolNode(tools) |
|
|
| |
| class Plan(BaseModel): |
| """Plan to follow in future""" |
| thought: str = Field(description="The reasoning process behind generating this plan.") |
| steps: List[str] = Field(description="Different steps to follow, in sorted order.") |
|
|
| class Response(BaseModel): |
| """Response to user.""" |
| response: str |
|
|
| class Act(BaseModel): |
| """Action to perform.""" |
| thought: str = Field(description="The reasoning process behind choosing this action (Plan or Response).") |
| action: Union[Response, Plan] = Field(description="Action to perform. Response for final answer, Plan for more steps.") |
|
|
| |
| def get_tools_description() -> str: |
| """Generate a formatted string describing all available tools.""" |
| tool_descriptions = [] |
| for tool in tools: |
| name = getattr(tool, "name", str(tool)) |
| description = getattr(tool, "description", getattr(tool, "__doc__", "No description available")) |
| first_line_desc = description.split('\n')[0].strip() if description else "No description available" |
| tool_descriptions.append(f"- `{name}`: {first_line_desc}") |
| return "\n".join(tool_descriptions) |
|
|
| tools_desc = get_tools_description() |
|
|
| planner_prompt = ChatPromptTemplate.from_messages( |
| [ |
| ( |
| "system", |
| f"""For the given objective, devise a simple step-by-step plan. |
| Also provide a detailed thought process explaining how you arrived at the plan. |
| **Plan Requirements:** |
| * **Simplicity:** Keep the plan as straightforward as possible. |
| * **Task Types:** Each step must be EITHER: |
| * A task requiring a specific tool from the available list. |
| * A reasoning step for the LLM to perform internally (e.g., summarizing information, comparing results). |
| * **Tool Usage:** If a step uses a tool, clearly state the tool name and what it should do. |
| * **Conciseness:** Avoid superfluous steps. The result of the final step should be the final answer. |
| **Available Tools:** |
| {tools_desc} |
| Output your thought process and the plan steps. |
| """, |
| ), |
| ("placeholder", "{initial_user_message}"), |
| ] |
| ) |
|
|
| planner = planner_prompt | llm.with_structured_output(Plan) |
|
|
| |
| replanner_prompt = ChatPromptTemplate.from_template( |
| f"""You are a replanner. Your goal is to refine the plan to achieve the objective, or decide if the objective is met. |
| **Objective:** |
| {{question}} |
| **Original Plan (remaining steps):** |
| {{plan_str}} |
| **History (Executed Steps and Thoughts):** |
| {{past_steps_str}} |
| **Most Recent Step Executed:** '{{current_task}}' |
| **Direct Result of Last Step:** |
| {{latest_result}} |
| **Your Task:** |
| Analyze the **History (Executed Steps and Thoughts)** and the **Direct Result of Last Step** carefully. |
| * If the last step successfully moved towards the objective, continue the plan or refine it. |
| * If the last step failed, resulted in an error, or the **History** suggests the current approach is not working, you MUST revise the plan to try a different approach. |
| Based on this analysis, decide the next course of action (Respond or Revise Plan). |
| **Action Options:** |
| 1. **Respond (Response action):** If the objective is met and you have the final answer, provide it. |
| 2. **Revise Plan (Plan action):** If more steps are needed, provide a new, simple plan containing only the remaining steps. |
| **Available Tools:** |
| {tools_desc} |
| Output your thought process and the chosen action (Plan or Response). |
| """ |
| ) |
|
|
| replanner = replanner_prompt | llm.with_structured_output(Act) |
|
|
| |
| def plan_step(state: TaskState): |
| """Generate the initial plan based on the initial question/file info.""" |
| plan_output = planner.invoke({"initial_user_message": state["messages"]}) |
| return { |
| "plan": plan_output.steps, |
| "messages": [] |
| } |
|
|
| def prepare_next_step(state: TaskState): |
| """Prepare the state for the executor LLM call for the next plan step.""" |
| plan = state["plan"] |
| original_question = state["question"] |
| current_task = plan[0] if plan else "" |
| remaining_plan = plan[1:] if plan else [] |
|
|
| task_message_content = f"""Original User Question: {original_question} |
| Current Task: {current_task} |
| Based *only* on the 'Current Task' description above, decide if a tool needs to be called. |
| If you call an analysis tool, pass the necessary arguments. |
| If no tool is needed for the Current Task, explain the reasoning or result based on the task description. |
| """ |
| task_message = HumanMessage(content=task_message_content) |
|
|
| updated_messages = state.get("messages", []) + [task_message] |
|
|
| return { |
| "plan": remaining_plan, |
| "current_task": current_task, |
| "messages": updated_messages |
| } |
|
|
| def executor_llm_call(state: TaskState): |
| """Invoke the LLM with the current task, deciding on tool use.""" |
| model_with_tools = llm.bind_tools(tools) |
| response = model_with_tools.invoke(state["messages"]) |
| return {"messages": [response]} |
|
|
| def replan_step(state: TaskState): |
| """Replans based on the completed step's result and history.""" |
| current_task = state["current_task"] |
| messages = state["messages"] |
|
|
| latest_result = "" |
| if messages: |
| last_message = messages[-1] |
| if isinstance(last_message, AIMessage): |
| latest_result = last_message.content |
| elif isinstance(last_message, ToolMessage): |
| latest_result = last_message.content |
| else: |
| latest_result = str(last_message) |
| else: |
| latest_result = "(No message found for task result)" |
|
|
| past_steps_str = "\n".join( |
| f"Step: {task}\nThought: {thought}" for task, thought in state.get("past_steps", []) |
| ) |
| plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(state.get("plan", []))) |
|
|
| replanner_input = { |
| "question": state["question"], |
| "plan_str": plan_str, |
| "past_steps_str": past_steps_str, |
| "current_task": current_task, |
| "latest_result": latest_result, |
| } |
|
|
| output = replanner.invoke(replanner_input) |
|
|
| updated_past_steps = [(current_task, output.thought)] |
|
|
| if isinstance(output.action, Response): |
| print(f"Replanner provided a final response: {output.action.response}") |
| final_answer_prompt = f"""The user's original question was: {state['question']} |
| The result determined by the plan is: {output.action.response} |
| Based on this result, output ONLY the final formatted answer itself, and nothing else. |
| Keep the answer concise and exact.""" |
|
|
| final_answer_llm = get_llm() |
| extracted_response = final_answer_llm.invoke(final_answer_prompt).content.strip() |
|
|
| return { |
| "response": extracted_response, |
| "past_steps": updated_past_steps, |
| "messages": [], |
| "current_task": "" |
| } |
| else: |
| return { |
| "plan": output.action.steps, |
| "past_steps": updated_past_steps, |
| "messages": state["messages"], |
| "current_task": "" |
| } |
|
|
| |
| def route_after_executor_call(state: TaskState) -> Literal["tool_node", "replan_step"]: |
| """Route to tool node if tool call exists, otherwise to replan.""" |
| messages = state["messages"] |
| last_message = messages[-1] if messages else None |
| if isinstance(last_message, AIMessage) and last_message.tool_calls: |
| return "tool_node" |
| else: |
| return "replan_step" |
|
|
| def route_after_replan(state: TaskState) -> Literal["prepare_next_step", END]: |
| """Route to prepare next step if plan exists, otherwise end.""" |
| if state.get("response"): |
| return END |
| elif state.get("plan"): |
| return "prepare_next_step" |
| else: |
| print("Warning: Replanner finished without response or new plan.") |
| return END |
|
|
| |
| def download_file(task_id: str, file_name: str, api_url: str = DEFAULT_API_URL) -> str: |
| """Downloads file, returns path or empty string on failure.""" |
| temp_dir = get_task_temp_dir(task_id) |
| file_url = f"{api_url}/files/{task_id}" |
| file_path = os.path.join(temp_dir, file_name) |
|
|
| try: |
| response = requests.get(file_url, stream=True) |
| response.raise_for_status() |
| with open(file_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
| print(f"File downloaded successfully to {file_path}") |
| return file_path |
| except Exception as e: |
| print(f"Error downloading file: {str(e)}") |
| return "" |
|
|
| def process_file(state: TaskState): |
| """Download file if needed, prepare initial state and message.""" |
| task_id = state.get("task_id", "") |
| file_name = state.get("file_name", "") |
| api_url = state.get("api_url", DEFAULT_API_URL) |
| question = state.get("question", "") |
| initial_message_content = question |
|
|
| file_path_update = {} |
| temp_dir_update = {} |
|
|
| if task_id and file_name: |
| temp_dir = get_task_temp_dir(task_id) |
| temp_dir_update = {"temp_dir": temp_dir} |
| file_path = download_file(task_id, file_name, api_url) |
| file_path_update = {"file_path": file_path} |
| if file_path: |
| initial_message_content += f"\n\n(Note: File downloaded to: {file_path})" |
| else: |
| initial_message_content += f"\n\n(Note: Failed to download file '{file_name}')" |
|
|
| return { |
| "question": question, |
| "task_id": task_id, |
| "file_name": file_name, |
| "api_url": api_url, |
| **file_path_update, |
| **temp_dir_update, |
| "messages": [HumanMessage(content=initial_message_content)], |
| "plan": [], |
| "past_steps": [], |
| "response": "", |
| "current_task": "", |
| } |
|
|
| def process_input(state: TaskState) -> TaskState: |
| """Prepare initial state when no file processing is needed.""" |
| question = state.get("question", "") |
| return { |
| "question": question, |
| "task_id": state.get("task_id", ""), |
| "file_name": None, |
| "api_url": state.get("api_url", DEFAULT_API_URL), |
| "file_path": None, |
| "temp_dir": None, |
| "messages": [HumanMessage(content=question)], |
| "plan": [], |
| "past_steps": [], |
| "response": "", |
| "current_task": "", |
| } |
|
|
| def should_process_file(state: TaskState) -> Literal["process_file", "process_input"]: |
| """Determine entry point based on file presence.""" |
| task_id = state.get("task_id", "") |
| file_name = state.get("file_name", "") |
| if task_id and file_name: |
| return "process_file" |
| return "process_input" |
|
|
| |
| def create_plan_execute_task_flow(): |
| """Creates the LangGraph StateGraph for plan-and-execute agent.""" |
| graph = StateGraph(TaskState) |
|
|
| |
| graph.add_node("process_input", process_input) |
| graph.add_node("process_file", process_file) |
| graph.add_node("planner", plan_step) |
| graph.add_node("prepare_next_step", prepare_next_step) |
| graph.add_node("executor_llm_call", executor_llm_call) |
| graph.add_node("tool_node", tool_node) |
| graph.add_node("replan_step", replan_step) |
|
|
| |
| graph.set_conditional_entry_point( |
| should_process_file, |
| {"process_file": "process_file", "process_input": "process_input"} |
| ) |
| graph.add_edge("process_input", "planner") |
| graph.add_edge("process_file", "planner") |
| graph.add_edge("planner", "prepare_next_step") |
| graph.add_edge("prepare_next_step", "executor_llm_call") |
| graph.add_conditional_edges( |
| "executor_llm_call", |
| route_after_executor_call, |
| {"tool_node": "tool_node", "replan_step": "replan_step"} |
| ) |
| graph.add_edge("tool_node", "replan_step") |
| graph.add_conditional_edges( |
| "replan_step", |
| route_after_replan, |
| {"prepare_next_step": "prepare_next_step", END: END} |
| ) |
|
|
| app = graph.compile() |
| print("Plan-and-execute task graph compiled.") |
| return app, graph |
|
|
| |
| class LangGraphAgent: |
| def __init__(self): |
| print("LangGraphAgent initialized with Plan-and-Execute flow.") |
| self.app_executor, _ = create_plan_execute_task_flow() |
| |
| def __call__(self, item: dict) -> str: |
| task_id = item.get("task_id") |
| question = item.get("question") |
| file_name = item.get("file_name", None) |
| |
| print(f"Agent received task {task_id}: {question[:50]}... (File: {file_name})") |
| |
| if not question: |
| return "Error: Missing question in task item." |
| |
| try: |
| initial_state = { |
| "task_id": task_id, |
| "question": question, |
| "file_name": file_name if file_name else None, |
| "api_url": DEFAULT_API_URL |
| } |
| |
| print(f"Invoking agent for task {task_id}...") |
| result = self.app_executor.invoke(initial_state) |
| |
| answer = result.get("response", "Error: No final response generated.") |
| |
| if not isinstance(answer, str): |
| answer = str(answer) |
| |
| print(f"Agent returning answer for task {task_id}: {answer[:50]}...") |
| return answer |
| |
| except Exception as e: |
| print(f"Error processing task {task_id}: {e}") |
| import traceback |
| traceback.print_exc() |
| return f"Error: {str(e)}" |
|
|
| |
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """Fetches all questions, runs the agent, submits all answers.""" |
| space_id = os.getenv("SPACE_ID") |
|
|
| if not profile: |
| return "Please Login to Hugging Face with the button.", None |
| |
| username = profile.username |
| print(f"User logged in: {username}") |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| try: |
| agent = LangGraphAgent() |
| except Exception as e: |
| print(f"Error instantiating agent: {e}") |
| return f"Error initializing agent: {e}", None |
| |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
| |
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| return "Fetched questions list is empty.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except Exception as e: |
| return f"Error fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
| print(f"Running agent on {len(questions_data)} questions...") |
| |
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| continue |
| try: |
| submitted_answer = agent(item) |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
| except Exception as e: |
| print(f"Error on task {task_id}: {e}") |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"ERROR: {e}"}) |
|
|
| if not answers_payload: |
| return "No answers produced.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
| |
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=120) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"✅ Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)" |
| ) |
| return final_status, pd.DataFrame(results_log) |
| except Exception as e: |
| return f"Submission failed: {e}", pd.DataFrame(results_log) |
|
|
| |
| with gr.Blocks() as demo: |
| gr.Markdown("# 🦾 GAIA Agent Evaluator - Gemini Edition") |
| gr.Markdown( |
| """ |
| **Instructions:** |
| 1. Login to Hugging Face |
| 2. Click 'Run Evaluation & Submit' |
| 3. Wait for the agent to process all questions |
| |
| **Model:** Gemini 2.0 Flash Thinking (gratuit, excellent pour le raisonnement) |
| """ |
| ) |
|
|
| gr.LoginButton() |
| run_button = gr.Button("🚀 Run Evaluation & Submit", variant="primary") |
| status_output = gr.Textbox(label="Status", lines=5, interactive=False) |
| results_table = gr.DataFrame(label="Results", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |