Spaces:
Running
Running
File size: 7,512 Bytes
44aa671 dfad45c 44aa671 dfad45c 44aa671 dfad45c 44aa671 dfad45c abaaf08 dfad45c 44aa671 dfad45c 799013a dfad45c 44aa671 dfad45c 44aa671 dfad45c abaaf08 dfad45c abaaf08 dfad45c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import os
import requests
import numpy as np
import pandas as pd
from io import StringIO
from bs4 import BeautifulSoup
from langchain_core.tools import tool
from duckduckgo_search import DDGS
from tavily import TavilyClient
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from tools.utils import StructureAwareTextSplitter
TOP_K = 5
MAX_RESULTS = 2
UNWANTED_TAGS = ['nav', 'header', 'footer', 'aside', 'form', 'script', 'style', 'button']
TAGS_TO_KEEP = ['h1', 'h2', 'h3', 'p', 'ul', 'ol', 'table', 'span']
def _format_table_to_string(table_html):
"""
Convert an HTML table to a markdown-style string representation.
Args:
table_html (str): HTML string of the table.
Returns:
str: Table formatted as a markdown-style string, or a message if parsing fails.
"""
try:
df = pd.read_html(StringIO(table_html))[0]
except:
return ["[Table could not be parsed]"]
if df.empty:
return None
table_str = "|"
# Put column headers
for col in df.columns:
table_str += f" {col} |"
table_str += "\n"
# Put rows
for _, row in df.iterrows():
table_str += "|"
for col, val in row.items():
table_str += f" {val} |"
table_str += "\n"
return table_str
def _extract_list(tag, level=0):
"""
Recursively extract nested HTML lists (<ul> or <ol>) into a formatted text list.
Args:
tag (bs4.element.Tag): The <ul> or <ol> BeautifulSoup tag to extract.
level (int): The current nesting level (used for indentation and prefixing).
Returns:
list[str]: List of formatted strings representing the list items, preserving nesting.
"""
items = []
if tag.name not in ["ul", "ol"]:
return items
is_ordered = tag.name == "ol"
# Determine prefix style
if is_ordered:
# Use numbers for top-level, letters for nested
if level == 0:
item_prefix = lambda idx: f"{idx+1}."
else:
# a., b., c., ...
item_prefix = lambda idx: f"{chr(97+idx)}."
else:
item_prefix = lambda idx: "-"
for idx, li in enumerate(tag.find_all("li", recursive=False)):
# Get all text inside the li, flattening tags (including spans)
text = li.get_text(" ", strip=True)
# Remove text from nested lists (if any)
for nested in li.find_all(["ul", "ol"], recursive=False):
nested.extract()
nested = li.find(["ul", "ol"], recursive=False)
if nested:
nested_items = _extract_list(nested, level+1)
if text:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
items.extend([f"{' '*(level+1)}{line}" for line in nested_items])
else:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
return items
def _parse_structured_content(soup):
"""
Parse the main content of a BeautifulSoup HTML document into structured blocks.
Args:
soup (bs4.BeautifulSoup): Parsed HTML document.
Returns:
list[dict]: List of structured content blocks (headers, paragraphs, lists, tables).
"""
content = []
for tag in soup.find_all(TAGS_TO_KEEP):
if tag.name in ['h1', 'h2', 'h3']:
content.append({'type': 'header', 'level': tag.name, 'text': tag.get_text(strip=True)})
elif tag.name == 'p':
content.append({'type': 'paragraph', 'text': tag.get_text(strip=True)})
elif tag.name in ['ul', 'ol']:
if tag.find_parent(['ul', 'ol', 'table']) is None:
items = _extract_list(tag)
content.append({'type': 'list', 'items': items})
elif tag.name == 'table':
content.append({'type': 'table', 'html': str(tag)})
elif tag.name == 'span':
# Only include spans that are not empty and with not parent element
if (tag.find_parent(['ul', 'ol', 'table', 'p']) is None) and tag.get_text(strip=True):
content.append({'type': 'span', 'text': tag.get_text(strip=True)})
return content
@tool
def web_search(query: str) -> str:
"""
Perform a web search using DuckDuckGo.
This tool is acting as live data RAG (Retrieval-Augmented Generation) tool.
It's useful for retrieving relevant information or obtaining domain knowledge
in a specific area, such as mathematics, science, games, etc.
Args:
query (str): The search query.
Returns:
chunks (str): Concatenated string of most relevant chunks.
"""
USE_DDGS = os.getenv("USE_DDGS", "false").lower() == "true"
# ----- STEP 1: Find the most relevant webpages
if USE_DDGS:
results = DDGS(timeout=30).text(query, max_results=MAX_RESULTS)
urls = [r['href'] for r in results if 'href' in r]
else:
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
response = tavily_client.search(query, max_results=MAX_RESULTS)
urls = [r['url'] for r in response['results'] if 'url' in r]
all_chunks = []
for url in urls:
try:
response = requests.get(url)
html = response.text
except Exception as e:
return f"Error fetching URL {url}: {str(e)}"
# Output the html content to a file for debugging
with open(f"test_output/{urls.index(url)}_web_search.txt", "w", encoding="utf-8") as f:
f.write(html)
# ----- STEP 2: Parse and clean the HTML content
soup = BeautifulSoup(html, "html.parser")
# Remove unwanted tags before parsing structured content
for tag in soup.find_all(UNWANTED_TAGS):
tag.decompose()
structured_content = _parse_structured_content(soup)
# ----- STEP 3: Format tables to string representation
for item in structured_content:
if item['type'] == 'table':
table_str = _format_table_to_string(item['html'])
if table_str:
item['text'] = table_str
else:
# Skip empty or unparseable tables
structured_content.remove(item)
# ----- STEP 4: Split structured content into chunks
splitter = StructureAwareTextSplitter(chunk_size=500, chunk_overlap=50)
documents = splitter.split_documents(structured_content)
all_chunks.extend([
f"\n\n----- CHUNK {i} (url: {url})-----\n\n" + doc.page_content
for i, doc in enumerate(documents)
])
# ----- STEP 5: Make embeddings
model = SentenceTransformer("all-MiniLM-L6-v2") # Small & fast
embeddings = model.encode(all_chunks)
embedded_query = model.encode(query)
# ----- STEP 6: Calculate cosine similarity
# Reshape query for pairwise comparison
embedded_query = np.array(embedded_query).reshape(1, -1)
embeddings = np.array(embeddings)
# Compute cosine similarities
similarities = cosine_similarity(embedded_query, embeddings)[0] # Shape: (n_chunks,)
# Get most similar chunks
top_indices = similarities.argsort()[-TOP_K:][::-1]
# output in a file the top chunks
with open(f"test_output/top_chunks.txt", "w", encoding="utf-8") as f:
for c in all_chunks:
f.write(c)
return "".join([all_chunks[idx] for idx in top_indices]) |