nsx_agent / tools /web_search.py
ningshixian's picture
Update tools/web_search.py
beab53a verified
from typing import Any, Optional
from smolagents.tools import Tool
import duckduckgo_search
class DuckDuckGoSearchTool(Tool):
name = "web_search"
description = "Performs a duckduckgo web search based on your query (think a Google search) then returns the top search results."
inputs = {'query': {'type': 'string', 'description': 'The search query to perform.'}}
output_type = "string"
def __init__(self, max_results=10, **kwargs):
super().__init__()
self.max_results = max_results
try:
from duckduckgo_search import DDGS
except ImportError as e:
raise ImportError(
"You must install package `duckduckgo_search` to run this tool: for instance run `pip install duckduckgo-search`."
) from e
self.ddgs = DDGS(**kwargs)
def forward(self, query: str) -> str:
results = self.ddgs.text(query, max_results=self.max_results)
if len(results) == 0:
raise Exception("No results found! Try a less restrictive/shorter query.")
postprocessed_results = [f"[{result['title']}]({result['href']})\n{result['body']}" for result in results]
return "## Search Results\n\n" + "\n\n".join(postprocessed_results)
class GoogleSearchTool(Tool):
name = "web_search"
description = """Performs a google web search for your query then returns a string of the top search results."""
inputs = {
"query": {"type": "string", "description": "The search query to perform."},
"filter_year": {
"type": "integer",
"description": "Optionally restrict results to a certain year",
"nullable": True,
},
}
output_type = "string"
def __init__(self, provider: str = "serpapi"):
super().__init__()
import os
self.provider = provider
if provider == "serpapi":
self.organic_key = "organic_results"
api_key_env_name = "SERPAPI_API_KEY"
else:
self.organic_key = "organic"
api_key_env_name = "SERPER_API_KEY"
self.api_key = os.getenv(api_key_env_name)
if self.api_key is None:
raise ValueError(f"Missing API key. Make sure you have '{api_key_env_name}' in your env variables.")
def forward(self, query: str, filter_year: Optional[int] = None) -> str:
import requests
if self.provider == "serpapi":
params = {
"q": query,
"api_key": self.api_key,
"engine": "google",
"google_domain": "google.com",
}
base_url = "https://serpapi.com/search.json"
else:
params = {
"q": query,
"api_key": self.api_key,
}
base_url = "https://google.serper.dev/search"
if filter_year is not None:
params["tbs"] = f"cdr:1,cd_min:01/01/{filter_year},cd_max:12/31/{filter_year}"
response = requests.get(base_url, params=params)
if response.status_code == 200:
results = response.json()
else:
raise ValueError(response.json())
if self.organic_key not in results.keys():
if filter_year is not None:
raise Exception(
f"No results found for query: '{query}' with filtering on year={filter_year}. Use a less restrictive query or do not filter on year."
)
else:
raise Exception(f"No results found for query: '{query}'. Use a less restrictive query.")
if len(results[self.organic_key]) == 0:
year_filter_message = f" with filter year={filter_year}" if filter_year is not None else ""
return f"No results found for '{query}'{year_filter_message}. Try with a more general query, or remove the year filter."
web_snippets = []
if self.organic_key in results:
for idx, page in enumerate(results[self.organic_key]):
date_published = ""
if "date" in page:
date_published = "\nDate published: " + page["date"]
source = ""
if "source" in page:
source = "\nSource: " + page["source"]
snippet = ""
if "snippet" in page:
snippet = "\n" + page["snippet"]
redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{snippet}"
web_snippets.append(redacted_version)
return "## Search Results\n" + "\n\n".join(web_snippets)
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = (
"Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages."
)
inputs = {
"url": {
"type": "string",
"description": "The url of the webpage to visit.",
}
}
output_type = "string"
def __init__(self, max_output_length: int = 40000):
super().__init__()
self.max_output_length = max_output_length
def forward(self, url: str) -> str:
try:
import re
import requests
from markdownify import markdownify
from requests.exceptions import RequestException
from smolagents.utils import truncate_content
except ImportError as e:
raise ImportError(
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
) from e
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(url, timeout=20)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
return truncate_content(markdown_content, self.max_output_length)
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"