Spaces:
Running
Running
import os | |
import requests | |
import numpy as np | |
import pandas as pd | |
from io import StringIO | |
from bs4 import BeautifulSoup | |
from langchain_core.tools import tool | |
from duckduckgo_search import DDGS | |
from tavily import TavilyClient | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from tools.utils import StructureAwareTextSplitter | |
TOP_K = 5 | |
MAX_RESULTS = 2 | |
UNWANTED_TAGS = ['nav', 'header', 'footer', 'aside', 'form', 'script', 'style', 'button'] | |
TAGS_TO_KEEP = ['h1', 'h2', 'h3', 'p', 'ul', 'ol', 'table', 'span'] | |
def _format_table_to_string(table_html): | |
""" | |
Convert an HTML table to a markdown-style string representation. | |
Args: | |
table_html (str): HTML string of the table. | |
Returns: | |
str: Table formatted as a markdown-style string, or a message if parsing fails. | |
""" | |
try: | |
df = pd.read_html(StringIO(table_html))[0] | |
except: | |
return ["[Table could not be parsed]"] | |
if df.empty: | |
return None | |
table_str = "|" | |
# Put column headers | |
for col in df.columns: | |
table_str += f" {col} |" | |
table_str += "\n" | |
# Put rows | |
for _, row in df.iterrows(): | |
table_str += "|" | |
for col, val in row.items(): | |
table_str += f" {val} |" | |
table_str += "\n" | |
return table_str | |
def _extract_list(tag, level=0): | |
""" | |
Recursively extract nested HTML lists (<ul> or <ol>) into a formatted text list. | |
Args: | |
tag (bs4.element.Tag): The <ul> or <ol> BeautifulSoup tag to extract. | |
level (int): The current nesting level (used for indentation and prefixing). | |
Returns: | |
list[str]: List of formatted strings representing the list items, preserving nesting. | |
""" | |
items = [] | |
if tag.name not in ["ul", "ol"]: | |
return items | |
is_ordered = tag.name == "ol" | |
# Determine prefix style | |
if is_ordered: | |
# Use numbers for top-level, letters for nested | |
if level == 0: | |
item_prefix = lambda idx: f"{idx+1}." | |
else: | |
# a., b., c., ... | |
item_prefix = lambda idx: f"{chr(97+idx)}." | |
else: | |
item_prefix = lambda idx: "-" | |
for idx, li in enumerate(tag.find_all("li", recursive=False)): | |
# Get all text inside the li, flattening tags (including spans) | |
text = li.get_text(" ", strip=True) | |
# Remove text from nested lists (if any) | |
for nested in li.find_all(["ul", "ol"], recursive=False): | |
nested.extract() | |
nested = li.find(["ul", "ol"], recursive=False) | |
if nested: | |
nested_items = _extract_list(nested, level+1) | |
if text: | |
items.append(f"{' '*level}{item_prefix(idx)} {text}") | |
items.extend([f"{' '*(level+1)}{line}" for line in nested_items]) | |
else: | |
items.append(f"{' '*level}{item_prefix(idx)} {text}") | |
return items | |
def _parse_structured_content(soup): | |
""" | |
Parse the main content of a BeautifulSoup HTML document into structured blocks. | |
Args: | |
soup (bs4.BeautifulSoup): Parsed HTML document. | |
Returns: | |
list[dict]: List of structured content blocks (headers, paragraphs, lists, tables). | |
""" | |
content = [] | |
for tag in soup.find_all(TAGS_TO_KEEP): | |
if tag.name in ['h1', 'h2', 'h3']: | |
content.append({'type': 'header', 'level': tag.name, 'text': tag.get_text(strip=True)}) | |
elif tag.name == 'p': | |
content.append({'type': 'paragraph', 'text': tag.get_text(strip=True)}) | |
elif tag.name in ['ul', 'ol']: | |
if tag.find_parent(['ul', 'ol', 'table']) is None: | |
items = _extract_list(tag) | |
content.append({'type': 'list', 'items': items}) | |
elif tag.name == 'table': | |
content.append({'type': 'table', 'html': str(tag)}) | |
elif tag.name == 'span': | |
# Only include spans that are not empty and with not parent element | |
if (tag.find_parent(['ul', 'ol', 'table', 'p']) is None) and tag.get_text(strip=True): | |
content.append({'type': 'span', 'text': tag.get_text(strip=True)}) | |
return content | |
def web_search(query: str) -> str: | |
""" | |
Perform a web search using DuckDuckGo. | |
This tool is acting as live data RAG (Retrieval-Augmented Generation) tool. | |
It's useful for retrieving relevant information or obtaining domain knowledge | |
in a specific area, such as mathematics, science, games, etc. | |
Args: | |
query (str): The search query. | |
Returns: | |
chunks (str): Concatenated string of most relevant chunks. | |
""" | |
USE_DDGS = os.getenv("USE_DDGS", "false").lower() == "true" | |
# ----- STEP 1: Find the most relevant webpages | |
if USE_DDGS: | |
results = DDGS(timeout=30).text(query, max_results=MAX_RESULTS) | |
urls = [r['href'] for r in results if 'href' in r] | |
else: | |
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") | |
tavily_client = TavilyClient(api_key=TAVILY_API_KEY) | |
response = tavily_client.search(query, max_results=MAX_RESULTS) | |
urls = [r['url'] for r in response['results'] if 'url' in r] | |
all_chunks = [] | |
for url in urls: | |
try: | |
response = requests.get(url) | |
html = response.text | |
except Exception as e: | |
return f"Error fetching URL {url}: {str(e)}" | |
# Output the html content to a file for debugging | |
with open(f"test_output/{urls.index(url)}_web_search.txt", "w", encoding="utf-8") as f: | |
f.write(html) | |
# ----- STEP 2: Parse and clean the HTML content | |
soup = BeautifulSoup(html, "html.parser") | |
# Remove unwanted tags before parsing structured content | |
for tag in soup.find_all(UNWANTED_TAGS): | |
tag.decompose() | |
structured_content = _parse_structured_content(soup) | |
# ----- STEP 3: Format tables to string representation | |
for item in structured_content: | |
if item['type'] == 'table': | |
table_str = _format_table_to_string(item['html']) | |
if table_str: | |
item['text'] = table_str | |
else: | |
# Skip empty or unparseable tables | |
structured_content.remove(item) | |
# ----- STEP 4: Split structured content into chunks | |
splitter = StructureAwareTextSplitter(chunk_size=500, chunk_overlap=50) | |
documents = splitter.split_documents(structured_content) | |
all_chunks.extend([ | |
f"\n\n----- CHUNK {i} (url: {url})-----\n\n" + doc.page_content | |
for i, doc in enumerate(documents) | |
]) | |
# ----- STEP 5: Make embeddings | |
model = SentenceTransformer("all-MiniLM-L6-v2") # Small & fast | |
embeddings = model.encode(all_chunks) | |
embedded_query = model.encode(query) | |
# ----- STEP 6: Calculate cosine similarity | |
# Reshape query for pairwise comparison | |
embedded_query = np.array(embedded_query).reshape(1, -1) | |
embeddings = np.array(embeddings) | |
# Compute cosine similarities | |
similarities = cosine_similarity(embedded_query, embeddings)[0] # Shape: (n_chunks,) | |
# Get most similar chunks | |
top_indices = similarities.argsort()[-TOP_K:][::-1] | |
# output in a file the top chunks | |
with open(f"test_output/top_chunks.txt", "w", encoding="utf-8") as f: | |
for c in all_chunks: | |
f.write(c) | |
return "".join([all_chunks[idx] for idx in top_indices]) |