super_agent / tools /web_search.py
lezaf
Perform some cleanup
799013a
import os
import requests
import numpy as np
import pandas as pd
from io import StringIO
from bs4 import BeautifulSoup
from langchain_core.tools import tool
from duckduckgo_search import DDGS
from tavily import TavilyClient
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from tools.utils import StructureAwareTextSplitter
TOP_K = 5
MAX_RESULTS = 2
UNWANTED_TAGS = ['nav', 'header', 'footer', 'aside', 'form', 'script', 'style', 'button']
TAGS_TO_KEEP = ['h1', 'h2', 'h3', 'p', 'ul', 'ol', 'table', 'span']
def _format_table_to_string(table_html):
"""
Convert an HTML table to a markdown-style string representation.
Args:
table_html (str): HTML string of the table.
Returns:
str: Table formatted as a markdown-style string, or a message if parsing fails.
"""
try:
df = pd.read_html(StringIO(table_html))[0]
except:
return ["[Table could not be parsed]"]
if df.empty:
return None
table_str = "|"
# Put column headers
for col in df.columns:
table_str += f" {col} |"
table_str += "\n"
# Put rows
for _, row in df.iterrows():
table_str += "|"
for col, val in row.items():
table_str += f" {val} |"
table_str += "\n"
return table_str
def _extract_list(tag, level=0):
"""
Recursively extract nested HTML lists (<ul> or <ol>) into a formatted text list.
Args:
tag (bs4.element.Tag): The <ul> or <ol> BeautifulSoup tag to extract.
level (int): The current nesting level (used for indentation and prefixing).
Returns:
list[str]: List of formatted strings representing the list items, preserving nesting.
"""
items = []
if tag.name not in ["ul", "ol"]:
return items
is_ordered = tag.name == "ol"
# Determine prefix style
if is_ordered:
# Use numbers for top-level, letters for nested
if level == 0:
item_prefix = lambda idx: f"{idx+1}."
else:
# a., b., c., ...
item_prefix = lambda idx: f"{chr(97+idx)}."
else:
item_prefix = lambda idx: "-"
for idx, li in enumerate(tag.find_all("li", recursive=False)):
# Get all text inside the li, flattening tags (including spans)
text = li.get_text(" ", strip=True)
# Remove text from nested lists (if any)
for nested in li.find_all(["ul", "ol"], recursive=False):
nested.extract()
nested = li.find(["ul", "ol"], recursive=False)
if nested:
nested_items = _extract_list(nested, level+1)
if text:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
items.extend([f"{' '*(level+1)}{line}" for line in nested_items])
else:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
return items
def _parse_structured_content(soup):
"""
Parse the main content of a BeautifulSoup HTML document into structured blocks.
Args:
soup (bs4.BeautifulSoup): Parsed HTML document.
Returns:
list[dict]: List of structured content blocks (headers, paragraphs, lists, tables).
"""
content = []
for tag in soup.find_all(TAGS_TO_KEEP):
if tag.name in ['h1', 'h2', 'h3']:
content.append({'type': 'header', 'level': tag.name, 'text': tag.get_text(strip=True)})
elif tag.name == 'p':
content.append({'type': 'paragraph', 'text': tag.get_text(strip=True)})
elif tag.name in ['ul', 'ol']:
if tag.find_parent(['ul', 'ol', 'table']) is None:
items = _extract_list(tag)
content.append({'type': 'list', 'items': items})
elif tag.name == 'table':
content.append({'type': 'table', 'html': str(tag)})
elif tag.name == 'span':
# Only include spans that are not empty and with not parent element
if (tag.find_parent(['ul', 'ol', 'table', 'p']) is None) and tag.get_text(strip=True):
content.append({'type': 'span', 'text': tag.get_text(strip=True)})
return content
@tool
def web_search(query: str) -> str:
"""
Perform a web search using DuckDuckGo.
This tool is acting as live data RAG (Retrieval-Augmented Generation) tool.
It's useful for retrieving relevant information or obtaining domain knowledge
in a specific area, such as mathematics, science, games, etc.
Args:
query (str): The search query.
Returns:
chunks (str): Concatenated string of most relevant chunks.
"""
USE_DDGS = os.getenv("USE_DDGS", "false").lower() == "true"
# ----- STEP 1: Find the most relevant webpages
if USE_DDGS:
results = DDGS(timeout=30).text(query, max_results=MAX_RESULTS)
urls = [r['href'] for r in results if 'href' in r]
else:
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
response = tavily_client.search(query, max_results=MAX_RESULTS)
urls = [r['url'] for r in response['results'] if 'url' in r]
all_chunks = []
for url in urls:
try:
response = requests.get(url)
html = response.text
except Exception as e:
return f"Error fetching URL {url}: {str(e)}"
# Output the html content to a file for debugging
with open(f"test_output/{urls.index(url)}_web_search.txt", "w", encoding="utf-8") as f:
f.write(html)
# ----- STEP 2: Parse and clean the HTML content
soup = BeautifulSoup(html, "html.parser")
# Remove unwanted tags before parsing structured content
for tag in soup.find_all(UNWANTED_TAGS):
tag.decompose()
structured_content = _parse_structured_content(soup)
# ----- STEP 3: Format tables to string representation
for item in structured_content:
if item['type'] == 'table':
table_str = _format_table_to_string(item['html'])
if table_str:
item['text'] = table_str
else:
# Skip empty or unparseable tables
structured_content.remove(item)
# ----- STEP 4: Split structured content into chunks
splitter = StructureAwareTextSplitter(chunk_size=500, chunk_overlap=50)
documents = splitter.split_documents(structured_content)
all_chunks.extend([
f"\n\n----- CHUNK {i} (url: {url})-----\n\n" + doc.page_content
for i, doc in enumerate(documents)
])
# ----- STEP 5: Make embeddings
model = SentenceTransformer("all-MiniLM-L6-v2") # Small & fast
embeddings = model.encode(all_chunks)
embedded_query = model.encode(query)
# ----- STEP 6: Calculate cosine similarity
# Reshape query for pairwise comparison
embedded_query = np.array(embedded_query).reshape(1, -1)
embeddings = np.array(embeddings)
# Compute cosine similarities
similarities = cosine_similarity(embedded_query, embeddings)[0] # Shape: (n_chunks,)
# Get most similar chunks
top_indices = similarities.argsort()[-TOP_K:][::-1]
# output in a file the top chunks
with open(f"test_output/top_chunks.txt", "w", encoding="utf-8") as f:
for c in all_chunks:
f.write(c)
return "".join([all_chunks[idx] for idx in top_indices])