Spaces:
Sleeping
Sleeping
File size: 6,506 Bytes
fd18d93 8fe992b beab53a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
from typing import Any, Optional
from smolagents.tools import Tool
import duckduckgo_search
class DuckDuckGoSearchTool(Tool):
name = "web_search"
description = "Performs a duckduckgo web search based on your query (think a Google search) then returns the top search results."
inputs = {'query': {'type': 'string', 'description': 'The search query to perform.'}}
output_type = "string"
def __init__(self, max_results=10, **kwargs):
super().__init__()
self.max_results = max_results
try:
from duckduckgo_search import DDGS
except ImportError as e:
raise ImportError(
"You must install package `duckduckgo_search` to run this tool: for instance run `pip install duckduckgo-search`."
) from e
self.ddgs = DDGS(**kwargs)
def forward(self, query: str) -> str:
results = self.ddgs.text(query, max_results=self.max_results)
if len(results) == 0:
raise Exception("No results found! Try a less restrictive/shorter query.")
postprocessed_results = [f"[{result['title']}]({result['href']})\n{result['body']}" for result in results]
return "## Search Results\n\n" + "\n\n".join(postprocessed_results)
class GoogleSearchTool(Tool):
name = "web_search"
description = """Performs a google web search for your query then returns a string of the top search results."""
inputs = {
"query": {"type": "string", "description": "The search query to perform."},
"filter_year": {
"type": "integer",
"description": "Optionally restrict results to a certain year",
"nullable": True,
},
}
output_type = "string"
def __init__(self, provider: str = "serpapi"):
super().__init__()
import os
self.provider = provider
if provider == "serpapi":
self.organic_key = "organic_results"
api_key_env_name = "SERPAPI_API_KEY"
else:
self.organic_key = "organic"
api_key_env_name = "SERPER_API_KEY"
self.api_key = os.getenv(api_key_env_name)
if self.api_key is None:
raise ValueError(f"Missing API key. Make sure you have '{api_key_env_name}' in your env variables.")
def forward(self, query: str, filter_year: Optional[int] = None) -> str:
import requests
if self.provider == "serpapi":
params = {
"q": query,
"api_key": self.api_key,
"engine": "google",
"google_domain": "google.com",
}
base_url = "https://serpapi.com/search.json"
else:
params = {
"q": query,
"api_key": self.api_key,
}
base_url = "https://google.serper.dev/search"
if filter_year is not None:
params["tbs"] = f"cdr:1,cd_min:01/01/{filter_year},cd_max:12/31/{filter_year}"
response = requests.get(base_url, params=params)
if response.status_code == 200:
results = response.json()
else:
raise ValueError(response.json())
if self.organic_key not in results.keys():
if filter_year is not None:
raise Exception(
f"No results found for query: '{query}' with filtering on year={filter_year}. Use a less restrictive query or do not filter on year."
)
else:
raise Exception(f"No results found for query: '{query}'. Use a less restrictive query.")
if len(results[self.organic_key]) == 0:
year_filter_message = f" with filter year={filter_year}" if filter_year is not None else ""
return f"No results found for '{query}'{year_filter_message}. Try with a more general query, or remove the year filter."
web_snippets = []
if self.organic_key in results:
for idx, page in enumerate(results[self.organic_key]):
date_published = ""
if "date" in page:
date_published = "\nDate published: " + page["date"]
source = ""
if "source" in page:
source = "\nSource: " + page["source"]
snippet = ""
if "snippet" in page:
snippet = "\n" + page["snippet"]
redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{snippet}"
web_snippets.append(redacted_version)
return "## Search Results\n" + "\n\n".join(web_snippets)
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = (
"Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages."
)
inputs = {
"url": {
"type": "string",
"description": "The url of the webpage to visit.",
}
}
output_type = "string"
def __init__(self, max_output_length: int = 40000):
super().__init__()
self.max_output_length = max_output_length
def forward(self, url: str) -> str:
try:
import re
import requests
from markdownify import markdownify
from requests.exceptions import RequestException
from smolagents.utils import truncate_content
except ImportError as e:
raise ImportError(
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
) from e
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(url, timeout=20)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
return truncate_content(markdown_content, self.max_output_length)
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
|