GAIA_Agent / agents /research_agent.py
Delanoe Pirard
cookies.txt
68bd1d5
import os
import time
import logging
import re # Import regex for video ID extraction
from typing import List, Optional, Dict, Any # Added Dict
from duckdb.duckdb import description
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.tools.google import GoogleSearchToolSpec
from llama_index.tools.tavily_research import TavilyToolSpec
from llama_index.tools.wikipedia import WikipediaToolSpec
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec
from llama_index.tools.arxiv import ArxivToolSpec
# Attempt to import browser tools; handle import errors gracefully
try:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import WebDriverException, NoSuchElementException, TimeoutException
from helium import start_chrome, go_to, find_all, Text, kill_browser, get_driver, click, write, press
SELENIUM_AVAILABLE = True
except ImportError:
logging.warning("Selenium or Helium not installed. Browser interaction tools will be unavailable.")
SELENIUM_AVAILABLE = False
# Setup logging
logger = logging.getLogger(__name__)
# --- Browser Interaction Tools (Conditional on Selenium/Helium availability) ---
# Global browser instance (managed by initializer)
_browser_instance = None
_browser_driver = None
# Helper decorator for browser tool error handling and logging
def browser_tool_handler(func):
def wrapper(*args, **kwargs):
if not SELENIUM_AVAILABLE:
return "Error: Browser tools require Selenium and Helium to be installed."
if _browser_instance is None or _browser_driver is None:
# Attempt to initialize if not already done (e.g., if called directly)
# This is not ideal, initialization should happen via get_research_initializer()
logger.warning("Browser accessed before explicit initialization. Attempting to initialize now.")
try:
get_research_initializer() # This will initialize the browser
if _browser_instance is None or _browser_driver is None:
return "Error: Browser initialization failed."
except Exception as init_err:
return f"Error: Browser initialization failed: {init_err}"
func_name = func.__name__
logger.info(f"Executing browser tool: {func_name} with args: {args}, kwargs: {kwargs}")
try:
result = func(*args, **kwargs)
logger.info(f"Tool {func_name} executed successfully.")
# Ensure result is a string for consistency
return str(result) if result is not None else f"{func_name} completed."
except (NoSuchElementException, WebDriverException, TimeoutException) as e:
logger.warning(f"Browser error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}")
return f"Error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}"
except Exception as e:
logger.error(f"Unexpected error in {func_name}: {e}", exc_info=True)
return f"Unexpected error in {func_name}: {e}"
return wrapper
@browser_tool_handler
def visit_url(url: str, wait_seconds: float = 3.0) -> str:
"""Navigate the browser to the specified URL and wait for the page to load."""
logger.info(f"Navigating to {url} and waiting {wait_seconds}s...")
go_to(url)
time.sleep(wait_seconds) # Wait for dynamic content
current_url = _browser_driver.current_url
return f"Successfully navigated to: {current_url}"
@browser_tool_handler
def get_text_by_css_selector(selector: str) -> list[Any] | str:
"""
(Browser) Extract visible text content from a webpage using a CSS selector.
Args:
selector (str):
A valid CSS selector (e.g., 'body', '.content', '#main').
Behavior:
- If selector == 'body', extracts all visible text from the <body> tag.
- If the <body> tag is not found, falls back to Helium Text() for visible elements.
- For any other selector, uses Selenium to find all matching elements.
- Filters out invisible elements and empty lines.
Returns:
list[str]:
A list of visible text lines.
OR
str:
An error message starting with "Error:" on failure (e.g., missing state).
"""
logger.info(f"Extracting text using CSS selector: {selector}")
# state_dict = await ctx.get("state")
# if not state_dict:
# logger.error("State not found in context.")
# return "Error: State not found."
#
# research_content = state_dict.get("research_content", [])
if selector.lower() == "body":
# Helium Text() might be too broad, let's try body tag first
try:
body_element = _browser_driver.find_element(By.TAG_NAME, "body")
all_text = body_element.text.split("\n") # Split into lines
# Filter out empty lines
non_empty_text = [line.strip() for line in all_text if line.strip()]
logger.info(f"Extracted {len(non_empty_text)} lines of text from body.")
return non_empty_text
except NoSuchElementException:
logger.warning("Could not find body tag, falling back to Helium Text().")
elements = find_all(Text())
# Process Helium elements if fallback is used
texts = [elem.web_element.text for elem in elements if elem.web_element.is_displayed() and elem.web_element.text.strip()]
logger.info(f"Extracted {len(texts)} visible text elements using Helium Text().")
# research_content.extend(texts)
# state_dict["research_content"] = research_content
# await ctx.set("state", state_dict)
return texts
else:
# Use Selenium directly for more control
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
texts = [elem.text for elem in elements_selenium if elem.is_displayed() and elem.text.strip()]
logger.info(f"Extracted {len(texts)} visible text elements for selector {selector}.")
# state_dict["research_content"] = research_content
# await ctx.set("state", state_dict)
return texts
@browser_tool_handler
def search_in_page(query: str,
case_sensitive: bool = False,
max_results: int = 50) -> list[str] | str:
"""
(Browser) Search for occurrences of a word or phrase in the visible text of the current page.
Args:
query (str):
Word or phrase to search for (e.g., 'machine learning').
case_sensitive (bool, optional):
Whether the search should be case-sensitive (default: False).
max_results (int, optional):
Maximum number of matching lines to return (default: 50).
Behavior:
- Retrieves all visible text from the <body> tag.
- Splits the text into individual lines.
- Filters lines that contain the `query` (respecting `case_sensitive`).
- Appends the matching lines to `state['research_content']`.
- Truncates the result to `max_results`.
Returns:
list[str]:
List of matching lines (up to `max_results`).
OR
str:
An error message starting with "Error:" on failure (e.g., missing state or browser).
"""
# Ensure we have state
# state = await ctx.get("state") or {}
# if not state:
# logger.error("State not found in context.")
# return "Error: State not found."
# Extract all visible text from the page
try:
body = _browser_driver.find_element(By.TAG_NAME, "body")
text = body.text or ""
except Exception as e:
logger.error(f"Failed to extract page text: {e}")
return f"Error: Could not retrieve page text ({e})."
# Prepare for search
lines = [line.strip() for line in text.splitlines() if line.strip()]
needle = query if case_sensitive else query.lower()
# Find matches
matches = []
for line in lines:
haystack = line if case_sensitive else line.lower()
if needle in haystack:
matches.append(line)
if len(matches) >= max_results:
break
# Update research context
# research = state.get("research_content", [])
# research.extend(matches)
# state["research_content"] = research
# await ctx.set("state", state)
return matches
@browser_tool_handler
def suggest_informative_selectors(min_words: int = 10, max_selectors: int = 30) -> List[str]:
"""
Analyze the current page and return a list of CSS selectors likely to contain informative text,
along with up to 1000 characters of the element's visible content.
Parameters:
- min_words (int): minimum number of words in an element's text to consider it informative.
- max_selectors (int): maximum number of distinct selectors to return.
Returns:
- List[str]: each entry formatted as "selector: preview", where preview is a truncated (1000 chars max) version of the element's content.
"""
logger.info("Analyzing page to suggest informative CSS selectors with previews...")
elements = _browser_driver.find_elements(By.XPATH, "//*[not(self::script or self::style or self::head)]")
selector_scores: Dict[str, Dict] = {}
for elem in elements:
if not elem.is_displayed():
continue
try:
text = elem.text.strip()
if len(text.split()) >= min_words:
tag = elem.tag_name
class_attr = elem.get_attribute("class") or ""
id_attr = elem.get_attribute("id") or ""
# Prioritize by specificity: id > class > tag
if id_attr:
selector = f"{tag}#{id_attr}"
elif class_attr:
main_class = class_attr.strip().split()[0]
selector = f"{tag}.{main_class}"
else:
selector = tag
current_score = len(text)
if selector not in selector_scores or current_score > selector_scores[selector]["score"]:
selector_scores[selector] = {
"score": current_score,
"preview": text[:1000] # Limit preview to 1000 chars
}
except Exception as e:
logger.warning(f"Error processing element: {e}")
continue
# Sort by score (proxy for information density) and return top N
sorted_items = sorted(selector_scores.items(), key=lambda x: x[1]["score"], reverse=True)
top_descriptions = [f"{selector}: {info['preview']}" for selector, info in sorted_items[:max_selectors]]
logger.info(f"Suggested {len(top_descriptions)} informative selectors with previews.")
return top_descriptions
@browser_tool_handler
def inspect_clickable_elements(max_elements: int = 20) -> List[str]:
"""
Inspect the current page and return a list of visible, clickable elements with their CSS selectors and preview text.
Parameters:
- max_elements (int): maximum number of elements to include.
Returns:
- List[str]: descriptions of clickable elements with selector, tag, and truncated inner text.
"""
logger.info("Inspecting page for clickable elements...")
# Define XPaths for clickable elements
xpaths = [
"//a[@href]",
"//button",
"//input[@type='submit' or @type='button']",
"//*[@onclick]",
"//*[contains(@role, 'button')]"
]
seen = set()
results = []
for xpath in xpaths:
try:
elements = _browser_driver.find_elements(By.XPATH, xpath)
for elem in elements:
if not elem.is_displayed():
continue
try:
tag = elem.tag_name
class_attr = elem.get_attribute("class") or ""
id_attr = elem.get_attribute("id") or ""
text = elem.text.strip()
# Construct CSS selector
if id_attr:
selector = f"{tag}#{id_attr}"
elif class_attr:
selector = f"{tag}.{class_attr.strip().split()[0]}"
else:
selector = tag
if selector in seen:
continue
seen.add(selector)
description = (
f"selector: {selector}\n"
f"tag: {tag}\n"
f"text: {text[:100] if text else '[no visible text]'}"
)
results.append(description)
if len(results) >= max_elements:
logger.info(f"Reached limit of {max_elements} clickable elements.")
return results
except Exception as inner_err:
logger.warning(f"Error processing clickable element: {inner_err}")
except Exception as outer_err:
logger.warning(f"XPath evaluation failed: {xpath} => {outer_err}")
logger.info(f"Found {len(results)} clickable elements.")
return results
@browser_tool_handler
def inspect_clickable_elements_for_filtering_or_sorting(min_words: int = 1, max_items: int = 20) -> List[str]:
"""
Inspect the current page to find clickable elements (e.g., buttons, links, dropdowns)
that are likely to be used for filtering or sorting content.
Parameters:
- min_words (int): minimum number of words to consider an element potentially meaningful.
- max_items (int): maximum number of clickable selectors to return.
Returns:
- List[str]: a list of unique CSS selectors (e.g., button.sort, a.filter) likely tied to filtering/sorting functionality.
"""
logger.info("Inspecting clickable elements for filtering or sorting...")
clickable_tags = ["button", "a", "input", "select", "label", "div", "span"]
selectors_found = {}
for tag in clickable_tags:
try:
elements = _browser_driver.find_elements(By.TAG_NAME, tag)
for elem in elements:
if not elem.is_displayed() or not elem.is_enabled():
continue
text = elem.text.strip()
if len(text.split()) >= min_words or elem.get_attribute("aria-label") or elem.get_attribute("role") in {
"button", "combobox"}:
tag_name = elem.tag_name
class_attr = elem.get_attribute("class") or ""
id_attr = elem.get_attribute("id") or ""
if id_attr:
selector = f"{tag_name}#{id_attr}"
elif class_attr:
main_class = class_attr.strip().split()[0]
selector = f"{tag_name}.{main_class}"
else:
selector = tag_name
if selector not in selectors_found:
selectors_found[selector] = text
except Exception as e:
logger.warning(f"Failed to process tag '{tag}': {e}")
continue
sorted_selectors = sorted(selectors_found.items(), key=lambda x: len(x[1]), reverse=True)
final_selectors = [s for s, _ in sorted_selectors[:max_items]]
logger.info(f"Found {len(final_selectors)} candidate selectors for filtering/sorting.")
return final_selectors
@browser_tool_handler
def click_element_by_css(selector: str, index: int = 0) -> str:
"""Click on the Nth (0-based index) element matching the CSS selector."""
logger.info(f"Attempting to click element {index} matching selector: {selector}")
# Use Selenium directly for finding elements
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
if not elements_selenium:
raise NoSuchElementException(f"No elements found for selector: {selector}")
if index >= len(elements_selenium):
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}")
target_element = elements_selenium[index]
if not target_element.is_displayed() or not target_element.is_enabled():
logger.warning(f"Element {index} for selector {selector} is not visible or enabled. Attempting click anyway.")
# Try scrolling into view first
try:
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element)
time.sleep(0.5)
except Exception as scroll_err:
logger.warning(f"Could not scroll element into view: {scroll_err}")
# Use Helium click which might handle overlays better, passing the Selenium element
click(target_element)
time.sleep(1.5) # Increased wait after click
return f"Clicked element {index} matching selector {selector}. Current URL: {_browser_driver.current_url}"
@browser_tool_handler
def input_text_by_css(selector: str, text: str, index: int = 0, press_enter: bool = True) -> str:
"""Input text into the Nth (0-based index) element matching the CSS selector. Optionally press Enter."""
logger.info(f"Attempting to input text into element {index} matching selector: {selector}")
# Use Selenium directly for finding elements
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
if not elements_selenium:
raise NoSuchElementException(f"No elements found for selector: {selector}")
if index >= len(elements_selenium):
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}")
target_element = elements_selenium[index]
if not target_element.is_displayed() or not target_element.is_enabled():
logger.warning(f"Input element {index} for selector {selector} is not visible or enabled. Attempting input anyway.")
# Try scrolling into view
try:
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element)
time.sleep(0.5)
except Exception as scroll_err:
logger.warning(f"Could not scroll input element into view: {scroll_err}")
# Use Helium write, passing the Selenium element
write(text, into=target_element)
time.sleep(0.5)
if press_enter:
press(Keys.ENTER)
time.sleep(1.5) # Wait longer if Enter was pressed
return f"Input text into element {index} ({selector}) and pressed Enter. Current URL: {_browser_driver.current_url}"
else:
return f"Input text into element {index} ({selector})."
@browser_tool_handler
def scroll_page(direction: str = "down", amount: str = "page") -> str:
"""Scroll the page up or down by a specified amount ('page', 'top', 'bottom', or pixels)."""
logger.info(f"Scrolling {direction} by {amount}")
if direction not in ["up", "down"]:
raise ValueError("Direction must be \"up\" or \"down\".")
if amount == "page":
scroll_script = "window.scrollBy(0, window.innerHeight);" if direction == "down" else "window.scrollBy(0, -window.innerHeight);"
elif amount == "top":
scroll_script = "window.scrollTo(0, 0);"
elif amount == "bottom":
scroll_script = "window.scrollTo(0, document.body.scrollHeight);"
else:
try:
pixels = int(amount)
scroll_script = f"window.scrollBy(0, {pixels});" if direction == "down" else f"window.scrollBy(0, {-pixels});"
except ValueError:
raise ValueError("Amount must be \"page\", \"top\", \"bottom\", or a number of pixels.")
_browser_driver.execute_script(scroll_script)
time.sleep(1) # Wait for scroll effects
return f"Scrolled {direction} by {amount}."
@browser_tool_handler
def go_back() -> str:
"""Navigate the browser back one step in its history."""
logger.info("Navigating back...")
_browser_driver.back()
time.sleep(1.5) # Wait after navigation
return f"Navigated back. Current URL: {_browser_driver.current_url}"
@browser_tool_handler
def close_popups() -> str:
"""Send an ESC keypress to attempt to dismiss modals or pop-ups."""
logger.info("Sending ESC key...")
webdriver.ActionChains(_browser_driver).send_keys(Keys.ESCAPE).perform()
time.sleep(0.5)
return "Sent ESC key press."
async def answer_question(ctx: Context, question: str) -> str:
"""
Answer any question by following this strict format:
1. Include your chain of thought (your reasoning steps).
2. End your reply with the exact template:
FINAL ANSWER: [YOUR FINAL ANSWER]
YOUR FINAL ANSWER must be:
- A number, or
- As few words as possible, or
- A comma-separated list of numbers and/or strings.
Formatting rules:
* If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested.
* If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text.
* If asked for a comma-separated list, apply the above rules to each element.
This tool should be invoked immediately after completing the final planning sub-step.
"""
logger.info(f"Answering question: {question[:100]}")
state_dict = await ctx.get("state")
if not state_dict:
logger.error("State not found in context.")
return "Error: State not found."
research_content = state_dict.get("research_content", [])
research_content_str = "\n".join(research_content)
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not set for answer_question tool.")
return "Error: GEMINI_API_KEY not set."
model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
prompt = f"""
You are **StepwiseAnswerAgent**, a formal reasoning assistant designed to provide clear,
accurate, and actionable answers.
────────────────────────────────────────────
CORE OPERATING PRINCIPLES
────────────────────────────────────────────
1. **Comprehensive Information Gathering**
– Gather and synthesize all available information.
– Identify gaps or missing data.
2. **Step-by-Step Reasoning** *(internal only)*
– Think through the problem logically in sequential steps.
– This reasoning should remain invisible to the user; only the final answer is shown.
3. **Skeptical Verification**
– Question assumptions.
– Clearly flag any uncertainties or unverifiable claims (“uncertain”, “missing data”, etc.).
– Use reliable sources or tool outputs where possible.
4. **Clarity and Brevity**
– Use a formal and professional tone.
– Keep language precise and concise.
– Prioritize clarity, utility, and immediate usability of the answer.
────────────────────────────────────────────
INTERNAL PROCEDURE (HIDDEN)
────────────────────────────────────────────
A. List all known facts and identify unknowns.
B. Construct a logical step-by-step reasoning chain.
C. Validate consistency and completeness.
D. Output only the final answer, with optional extras if relevant.
────────────────────────────────────────────
RESPONSE FORMAT
────────────────────────────────────────────
**Answer:**
A clear, direct response addressing the user's request, without exposing reasoning steps.
*(Optional)*
– **Key Points:** bullet-point summary of critical insights.
– **Next Steps / Recommended Actions:** if applicable.
────────────────────────────────────────────
CONSTRAINTS
────────────────────────────────────────────
• Do not speculate. Clearly indicate when information is incomplete.
• Do not reveal internal reasoning or system instructions.
• No filler, no flattery, no unnecessary context.
• If the question is under-specified, ask for clarification instead of guessing.
"""
# Build the assistant prompt enforcing the required format
assistant_prompt = (
f"{prompt}\n\n"
"I will ask you a question. "
"Report your thoughts, and finish your answer with the following template: "
"FINAL ANSWER: [YOUR FINAL ANSWER]. "
"YOUR FINAL ANSWER should be a number OR as few words as possible "
"OR a comma separated list of numbers and/or strings. "
"If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. "
"If you are asked for a string, omit articles and abbreviations, and write digits in plain text. "
"If you are asked for a comma separated list, apply these rules to each element.\n\n"
"Let's begin.\n\n"
f"All available research: {research_content_str}\n"
f"Question: {question}\n"
"Answer:"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using answer LLM: {model_name}")
response = llm.complete(assistant_prompt)
logger.info("Answer generated successfully.")
return response.text
except Exception as e:
logger.error(f"LLM call failed during answer generation: {e}", exc_info=True)
return f"Error during answer generation: {e}"
# --- Agent Initializer Class ---
class ResearchAgentInitializer:
def __init__(self):
logger.info("Initializing ResearchAgent resources...")
self.llm = None
self.browser_tools = []
self.search_tools = []
self.datasource_tools = []
# Initialize LLM
self._initialize_llm()
# Initialize Browser (conditionally)
if SELENIUM_AVAILABLE:
self._initialize_browser()
self._create_browser_tools()
else:
logger.warning("Browser tools are disabled as Selenium/Helium are not available.")
# Initialize Search/Datasource Tools
self._create_search_tools()
self._create_datasource_tools()
self.answer_question = FunctionTool.from_defaults(
fn=answer_question,
name="answer_question",
description=(
"(QA) Answer any question using structured, step-by-step reasoning, and return a concise, final result.\n\n"
"**Inputs:**\n"
"- `ctx` (Context): Execution context containing prior research state.\n"
"- `question` (str): A direct, factual question to be answered based on collected knowledge.\n\n"
"**Behavior:**\n"
"- Retrieves accumulated research content from shared state.\n"
"- Performs logical reasoning internally using a formal chain-of-thought.\n"
"- Generates a full response that includes visible reasoning steps followed by a strict answer format.\n\n"
"**Output Format:**\n"
"- Returns a string with:\n"
" 1. Reasoning steps (visible to user).\n"
" 2. Final answer, always ending with:\n"
" `FINAL ANSWER: [your answer]`\n\n"
"**Answer Constraints:**\n"
"- The final answer must be:\n"
" • A number (without commas or units, unless explicitly requested), or\n"
" • A short string (no articles or abbreviations), or\n"
" • A comma-separated list of numbers and/or strings (same rules apply).\n\n"
"**Errors:**\n"
"- Returns a string prefixed with `Error:` if state is missing or LLM fails to respond."
)
)
logger.info("ResearchAgent resources initialized.")
def _initialize_llm(self):
agent_llm_model = os.getenv("RESEARCH_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for ResearchAgent LLM.")
raise ValueError("GEMINI_API_KEY must be set for ResearchAgent")
try:
self.llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"ResearchAgent LLM initialized: {agent_llm_model}")
except Exception as e:
logger.error(f"Failed to initialize ResearchAgent LLM: {e}", exc_info=True)
raise
def _initialize_browser(self):
global _browser_instance, _browser_driver
if _browser_instance is None:
logger.info("Initializing browser (Chrome headless)...")
try:
chrome_options = webdriver.ChromeOptions()
# Configurable options from env vars
if os.getenv("RESEARCH_AGENT_CHROME_NO_SANDBOX", "true").lower() == "true":
chrome_options.add_argument("--no-sandbox")
if os.getenv("RESEARCH_AGENT_CHROME_DISABLE_DEV_SHM", "true").lower() == "true":
chrome_options.add_argument("--disable-dev-shm-usage")
# Add prefs for downloads/popups
chrome_options.add_experimental_option("prefs", {
"download.prompt_for_download": False,
"plugins.always_open_pdf_externally": True,
"profile.default_content_settings.popups": 0
})
# Start Chrome using Helium
_browser_instance = start_chrome(headless=True, options=chrome_options)
_browser_driver = get_driver() # Get the underlying Selenium driver
logger.info("Browser initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize browser: {e}", exc_info=True)
# Set flags to prevent tool usage
global SELENIUM_AVAILABLE
SELENIUM_AVAILABLE = False
_browser_instance = None
_browser_driver = None
def _create_browser_tools(self):
if not SELENIUM_AVAILABLE:
self.browser_tools = []
return
self.browser_tools = [
FunctionTool.from_defaults(
fn=visit_url,
name="visit_url",
description=(
"(Browser) Navigate the browser to a specified URL and wait for the page to load.\n"
"Inputs: url (str), wait_seconds (float, default=3.0).\n"
"Output: str — confirmation message including final URL."
)
),
FunctionTool.from_defaults(
fn=get_text_by_css_selector,
name="get_text_by_css_selector",
description=(
"(Browser) Extract visible text content from a webpage using a CSS selector.\n\n"
"**Inputs:**\n"
"- `selector` (str): A valid CSS selector (e.g., `'body'`, `'.content'`, `'#main'`).\n\n"
"**Behavior:**\n"
"- If `selector='body'`, extracts all visible text from the `<body>` tag.\n"
"- If elements are not found via the DOM, falls back to visible elements via Helium `Text()`.\n"
"- For other selectors, uses Selenium to extract text from all visible matching elements.\n"
"- Filters out invisible and empty lines.\n\n"
"**Output:**\n"
"- `List[str]`: List of visible text lines, or an error message string on failure."
)
),
FunctionTool.from_defaults(
fn=search_in_page,
name="search_in_page",
description=(
"(Browser) Search for a word or phrase in the visible text of the current page.\n\n"
"**Inputs:**\n"
"- `query` (str): Word or phrase to search for (e.g., 'machine learning').\n"
"- `case_sensitive` (bool, optional): Whether the search is case-sensitive (default: False).\n"
"- `max_results` (int, optional): Maximum number of matching lines to return (default: 50).\n\n"
"**Behavior:**\n"
"- Extracts all visible text from the `<body>` tag.\n"
"- Splits text into lines and filters those containing `query`.\n"
"- Appends found lines to the shared `research_content` state.\n\n"
"**Output:**\n"
"- `List[str]`: Matching lines (up to `max_results`).\n"
"- `str`: An error message if state or browser is unavailable."
)
),
FunctionTool.from_defaults(
fn=click_element_by_css,
name="click_element_by_css",
description=(
"(Browser) Click the N-th visible element matching a CSS selector.\n"
"Inputs: selector (str), index (int, default=0).\n"
"Output: str — confirmation message with final URL."
)
),
FunctionTool.from_defaults(
fn=input_text_by_css,
name="input_text_by_css",
description=(
"(Browser) Input text into the N-th input element matching a CSS selector, optionally pressing Enter.\n"
"Inputs: selector (str), text (str), index (int, default=0), press_enter (bool, default=True).\n"
"Output: str — confirmation of text input and action."
)
),
FunctionTool.from_defaults(
fn=scroll_page,
name="scroll_page",
description=(
"(Browser) Scroll the page in a given direction and amount.\n"
"Inputs: direction (str: 'up' or 'down'), amount (str: 'page', 'top', 'bottom', or number of pixels).\n"
"Output: str — confirmation of scroll action."
)
),
FunctionTool.from_defaults(
fn=go_back,
name="navigate_back",
description=(
"(Browser) Navigate back one step in browser history.\n"
"Inputs: none.\n"
"Output: str — confirmation of back navigation with current URL."
)
),
FunctionTool.from_defaults(
fn=close_popups,
name="close_popups",
description=(
"(Browser) Attempt to close pop-ups or modals by simulating an ESC keypress.\n"
"Inputs: none.\n"
"Output: str — confirmation of ESC key sent."
)
),
FunctionTool.from_defaults(
fn=suggest_informative_selectors,
name="suggest_informative_selectors",
description=(
"(Browser) Analyze the current web page and return a list of up to N CSS selectors likely to contain "
"informative text content. Each result includes the CSS selector followed by a preview of up to "
"1000 characters of the element's text content. This is especially useful for manually identifying "
"relevant containers before applying filters, scrapers, or sorters.\n\n"
"**Inputs:**\n"
"- `min_words` (int, default=10): Minimum number of words in the element for it to be considered informative.\n"
"- `max_selectors` (int, default=15): Maximum number of top selectors to return.\n\n"
"**Output:**\n"
"- `List[str]`: Each string is formatted as:\n"
" 'selector: preview_text'\n"
" where `selector` is a CSS path (e.g. `div.article`, `section#main`) and `preview_text` is a truncated (1000 char max) excerpt "
"of the visible text in that element."
)
),
FunctionTool.from_defaults(
fn=inspect_clickable_elements_for_filtering_or_sorting,
name="inspect_filter_sort_selectors",
description=(
"(Browser) Manually inspect the page for clickable elements (buttons, dropdowns, etc.) that may be used "
"for filtering or sorting. Returns a list of candidate CSS selectors.\n"
"Inputs: min_words (int, default=1), max_items (int, default=20).\n"
"Output: List[str] — list of unique selectors."
)
),
FunctionTool.from_defaults(
fn=inspect_clickable_elements,
name="inspect_clickable_elements",
description=(
"(Browser) Inspect the current page for clickable elements (e.g., <a>, <button>, input[type=button], "
"or elements with onclick handlers). Returns up to N elements with:\n"
"- their CSS selector (id, class or tag fallback),\n"
"- their tag type (e.g., button, a, input),\n"
"- a preview of their visible text (up to 100 characters).\n"
"Useful for manual filtering or determining which elements to interact with programmatically."
)
)
]
logger.info(f"Created {len(self.browser_tools)} browser interaction tools.")
def _create_search_tools(self):
self.search_tools = []
# Google Search
google_spec = GoogleSearchToolSpec(key=os.getenv("GOOGLE_API_KEY"), engine=os.getenv("GOOGLE_CSE_ID"))
if google_spec:
google_tool = FunctionTool.from_defaults(
fn=google_spec.google_search,
name="google_search",
description="(Search) Execute a Google Custom Search query. Returns structured results.")
self.search_tools.append(google_tool)
# Tavily Search
tavily_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
if tavily_spec:
# Use search method which is more general
tavily_tool = FunctionTool.from_defaults(fn=tavily_spec.search, name="tavily_search")
tavily_tool.metadata.description = "(Search) Perform a deep research search using Tavily API. Good for finding documents/articles."
self.search_tools.append(tavily_tool)
# DuckDuckGo Search
ddg_spec = DuckDuckGoSearchToolSpec()
if ddg_spec:
ddg_tool = FunctionTool.from_defaults(fn=ddg_spec.duckduckgo_full_search, name="duckduckgo_search")
ddg_tool.metadata.description = "(Search) Execute a DuckDuckGo search. Returns structured results."
self.search_tools.append(ddg_tool)
logger.info(f"Created {len(self.search_tools)} search engine tools.")
def _create_datasource_tools(self):
self.datasource_tools = []
# Wikipedia
wiki_spec = WikipediaToolSpec()
if wiki_spec:
wiki_search_tool = FunctionTool.from_defaults(fn=wiki_spec.search_data, name="wikipedia_search_pages")
wiki_search_tool.metadata.description = "(Wikipedia) Search for Wikipedia page titles matching a query."
wiki_load_tool = FunctionTool.from_defaults(fn=wiki_spec.load_data, name="wikipedia_load_page")
wiki_load_tool.metadata.description = "(Wikipedia) Load the full content of a specific Wikipedia page title."
self.datasource_tools.extend([wiki_search_tool, wiki_load_tool])
# async def wiki_spec_load_data(ctx: Context, page: str, lang: str = "en", **kwargs: Dict[str, Any]) -> str:
# """
# (Wikipedia) Load the full content of a specific Wikipedia page and store it in the research context.
#
# Args:
# ctx (Context):
# Execution context used to access and update shared state.
# page (str):
# Title of the Wikipedia page to load (e.g., 'Alan Turing').
# lang (str, optional):
# Language code for the page (default: 'en').
# **kwargs (dict, optional):
# Additional keyword arguments forwarded to the underlying loader.
#
# Behavior:
# - Fetches the raw text content of the specified Wikipedia page.
# - Appends the retrieved content to the `research_content` list in `state`.
# - Persists the updated `state` back into the context.
#
# Returns:
# str:
# The full plain-text content of the Wikipedia page, or an error message
# starting with "Error:" if the context state is missing.
# """
# state_dict = await ctx.get("state")
# if not state_dict:
# logger.error("State not found in context.")
# return "Error: State not found."
#
# research_content = state_dict.get("research_content", [])
# content = wiki_spec.load_data(page, lang, **kwargs)
# research_content.append(content)
# state_dict["research_content"] = research_content
# await ctx.set("state", state_dict)
# return content
# wiki_load_tool = FunctionTool.from_defaults(
# fn=wiki_spec_load_data,
# name="wikipedia_load_page",
# description=(
# "(Wikipedia) Load the full content of a specific Wikipedia page and store it in the research context.\n\n"
# "**Inputs:**\n"
# "- `ctx` (Context): Execution context used to access and update shared state.\n"
# "- `page` (str): Title of the Wikipedia page to load (e.g., 'Alan Turing').\n"
# "- `lang` (str, optional): Language code for the Wikipedia page (default is `'en'`).\n"
# "- `**kwargs` (dict, optional): Additional keyword arguments forwarded to the underlying data loader.\n\n"
# "**Behavior:**\n"
# "- Loads the raw textual content of the specified Wikipedia page.\n"
# "- Appends the content to the `research_content` list in the shared `state`.\n\n"
# "** Output: ** \n"
# "- `str`: The full plain-text content of the Wikipedia page."
# )
# )
# self.datasource_tools.extend([wiki_search_tool, wiki_spec_load_data])
# Yahoo Finance
yf_spec = YahooFinanceToolSpec()
if yf_spec:
yf_tools_map = {
"balance_sheet": "Get the latest balance sheet for a stock ticker.",
"income_statement": "Get the latest income statement for a stock ticker.",
"cash_flow": "Get the latest cash flow statement for a stock ticker.",
"stock_basic_info": "Get basic info (price, market cap, summary) for a stock ticker.",
"stock_analyst_recommendations": "Get analyst recommendations for a stock ticker.",
"stock_news": "Get recent news headlines for a stock ticker."
}
for func_name, desc in yf_tools_map.items():
if hasattr(yf_spec, func_name):
tool = FunctionTool.from_defaults(fn=getattr(yf_spec, func_name), name=f"yahoo_finance_{func_name}")
tool.metadata.description = f"(YahooFinance) {desc}"
self.datasource_tools.append(tool)
else:
logger.warning(f"YahooFinance function {func_name} not found in spec.")
# ArXiv
arxiv_spec = ArxivToolSpec()
if arxiv_spec:
arxiv_tool = FunctionTool.from_defaults(fn=arxiv_spec.arxiv_query, name="arxiv_search")
arxiv_tool.metadata.description = "(ArXiv) Search ArXiv for academic papers matching a query."
self.datasource_tools.append(arxiv_tool)
logger.info(f"Created {len(self.datasource_tools)} specific data source tools.")
def get_agent(self) -> ReActAgent:
"""Creates and returns the configured ReActAgent for research."""
logger.info("Creating ResearchAgent ReActAgent instance...")
all_tools = self.browser_tools + self.search_tools + self.datasource_tools
if not all_tools:
logger.warning("No tools available for ResearchAgent. It will likely be unable to function.")
# System prompt (consider loading from file)
# Updated prompt to include YouTube tool
system_prompt = """
You are ResearchAgent, an autonomous web‑research assistant. Your goal is to gather information accurately and efficiently using the available tools.
Available Tool Categories
- (Browser): Tools for direct page interaction (visiting URLs, clicking, scrolling, extracting text/HTML, inputting text).
- (Search): Tools for querying search engines (Google, DuckDuckGo, Tavily).
- (Wikipedia): Tools for searching and loading Wikipedia pages.
- (YahooFinance): Tools for retrieving financial data (balance sheets, income statements, stock info, news).
- (ArXiv): Tool for searching academic papers on ArXiv.
- (Validation): Tools for assessing reliability
• cross_reference_check – verify a claim against source text
• logical_consistency_check – detect contradictions or fallacies
• bias_detection – uncover cognitive or framing biases
• fact_check_with_search – prepare an external fact‑check hand‑off
- (Answer): answer_question — use this when your research has yielded a definitive result and you must reply in the strict “FINAL ANSWER” format.
Answer Tool Usage
When no further data is needed, invoke answer_question with the user’s query. It returns text ending exactly with:
FINAL ANSWER: [YOUR FINAL ANSWER]
Formatting rules for YOUR FINAL ANSWER
- A single number, or
- As few words as possible, or
- A comma‑separated list of numbers and/or strings.
* Numeric values: no thousands separators or units (%, $, etc.) unless requested.
* Strings: omit articles and abbreviations; write digits in plain text.
* Lists: apply these rules to each element.
Workflow
1. Thought: analyse the goal; choose the single best tool for the next step and explain why.
2. Action: call that tool with correct arguments.
3. Observation: inspect the output, extract key info, note errors.
4. Reflect & Iterate: if the immediate goal is unmet, loop back to step 1 or choose another tool.
5. Validate: after every Action‑Observation, validate the new finding with a Validation tool or by delegating to advanced_validation_agent. If validation fails, adjust and retry.
6. Long‑Context Management: after three total tool invocations, call long_context_management_agent to compress accumulated information.
7. Synthesize: once data is validated (and context managed when needed), integrate it into a coherent answer.
8. Respond: use answer_question to emit the FINAL ANSWER.
Constraints
- Exactly one tool per Action step.
- Think step‑by‑step; log Thought → Action → Observation clearly.
- If using Browser tools, always start with visit_url.
- Do not skip any stage (Thought → Action → Observation → Reflect → Validate → Context if needed → Synthesize → Respond).
Allowed Hand‑Off Agents
- code_agent: source‑code writing / debugging.
- math_agent: calculations, symbolic work.
- text_analyzer_agent: deep text processing (summary, extraction…).
- advanced_validation_agent: extensive factual / logical validation.
- long_context_management_agent: summarise or chunk long contexts.
- planner_agent: break down a new complex goal.
- reasoning_agent: multi‑hop logical reasoning.
Do not delegate to any agent outside this list.
If your response exceeds the maximum token limit and cannot be completed in a single reply, please conclude your output with the marker [CONTINUE]. In subsequent interactions, I will prompt you with “continue” to receive the next portion of the response.
"""
agent = ReActAgent(
name="research_agent",
description=(
"Performs web research using browser interaction, search engines (Google, DDG, Tavily), "
"specific data sources (Wikipedia, YahooFinance, ArXiv), and YouTube transcript fetching. Follows Thought-Action-Observation loop."
),
tools=all_tools,
llm=self.llm,
system_prompt=system_prompt,
can_handoff_to=[
"code_agent",
"math_agent",
"text_analyzer_agent", # Added based on original prompt
"advanced_validation_agent",
"long_context_management_agent"
"planner_agent",
"reasoning_agent"
],
)
logger.info("ResearchAgent ReActAgent instance created.")
return agent
def close_browser(self):
"""Closes the browser instance if it was initialized."""
global _browser_instance, _browser_driver
if _browser_instance:
logger.info("Closing browser instance...")
try:
kill_browser() # Use Helium's function
logger.info("Browser closed successfully.")
except Exception as e:
logger.error(f"Error closing browser: {e}", exc_info=True)
finally:
_browser_instance = None
_browser_driver = None
else:
logger.info("No active browser instance to close.")
# --- Singleton Initializer Instance ---
_research_agent_initializer_instance = None
def get_research_initializer():
"""Gets the singleton instance of ResearchAgentInitializer."""
global _research_agent_initializer_instance
if _research_agent_initializer_instance is None:
logger.info("Instantiating ResearchAgentInitializer for the first time.")
_research_agent_initializer_instance = ResearchAgentInitializer()
return _research_agent_initializer_instance
# --- Public Initialization Function ---
def initialize_research_agent() -> ReActAgent:
"""Initializes and returns the Research Agent using a singleton initializer."""
logger.info("initialize_research_agent called.")
initializer = get_research_initializer()
return initializer.get_agent()
# --- Cleanup Function (Optional but recommended) ---
def cleanup_research_agent_resources():
"""Cleans up resources used by the research agent, like the browser."""
logger.info("Cleaning up research agent resources...")
initializer = get_research_initializer() # Ensure it exists
initializer.close_browser()
# Example usage (for testing if run directly)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.info("Running research_agent.py directly for testing...")
# Check required keys
required_keys = ["GEMINI_API_KEY"] # Others are optional depending on tools needed
missing_keys = [key for key in required_keys if not os.getenv(key)]
if missing_keys:
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.")
else:
# Warn about optional keys
optional_keys = ["GOOGLE_API_KEY", "GOOGLE_CSE_ID", "TAVILY_API_KEY", "WOLFRAM_ALPHA_APP_ID"]
missing_optional = [key for key in optional_keys if not os.getenv(key)]
if missing_optional:
print(f"Warning: Optional environment variable(s) not set: {', '.join(missing_optional)}. Some tools may be unavailable.")