web-search-api / documents /webpage_content_extractor.py
Hansimov's picture
:gem: [Feature] New BatchWebpageContentExtractor: Extract webpage content from multiple html_paths concurrently
1db460d
raw
history blame
4.8 kB
import concurrent.futures
import re
from pathlib import Path
from pprint import pprint
from bs4 import BeautifulSoup
from tiktoken import get_encoding as tiktoken_get_encoding
from utils.logger import logger
from markdownify import markdownify
from networks.network_configs import IGNORE_TAGS, IGNORE_CLASSES
from termcolor import colored
class WebpageContentExtractor:
def __init__(self):
self.tokenizer = tiktoken_get_encoding("cl100k_base")
def count_tokens(self, text):
tokens = self.tokenizer.encode(text)
token_count = len(tokens)
return token_count
def html_to_markdown(self, html_str, ignore_links=True):
if ignore_links:
markdown_str = markdownify(html_str, strip="a")
else:
markdown_str = markdownify(html_str)
markdown_str = re.sub(r"\n{3,}", "\n\n", markdown_str)
self.markdown_token_count = self.count_tokens(markdown_str)
logger.mesg(f'- Tokens: {colored(self.markdown_token_count,"light_green")}')
self.markdown_str = markdown_str
return self.markdown_str
def remove_elements_from_html(self, html_str):
soup = BeautifulSoup(html_str, "html.parser")
ignore_classes_with_parentheses = [f"({word})" for word in IGNORE_CLASSES]
ignore_classes_pattern = f'{"|".join(ignore_classes_with_parentheses)}'
removed_element_counts = 0
for element in soup.find_all():
class_str = ""
id_str = ""
try:
class_attr = element.get("class", [])
if class_attr:
class_str = " ".join(list(class_attr))
if id_str:
class_str = f"{class_str} {id_str}"
except:
pass
try:
id_str = element.get("id", "")
except:
pass
if (
(not element.text.strip())
or (element.name in IGNORE_TAGS)
or (re.search(ignore_classes_pattern, class_str, flags=re.IGNORECASE))
or (re.search(ignore_classes_pattern, id_str, flags=re.IGNORECASE))
):
element.decompose()
removed_element_counts += 1
logger.mesg(
f"- Elements: "
f'{colored(len(soup.find_all()),"light_green")} / {colored(removed_element_counts,"light_red")}'
)
html_str = str(soup)
self.html_str = html_str
return self.html_str
def extract(self, html_path):
logger.note(f"Extracting content from: {html_path}")
if not Path(html_path).exists():
logger.warn(f"File not found: {html_path}")
return ""
with open(html_path, "r", encoding="utf-8") as rf:
html_str = rf.read()
html_str = self.remove_elements_from_html(html_str)
markdown_str = self.html_to_markdown(html_str)
return markdown_str
class BatchWebpageContentExtractor:
def __init__(self) -> None:
self.html_path_and_extracted_content_list = []
self.done_count = 0
def extract_single_html(self, html_path):
webpage_content_extractor = WebpageContentExtractor()
extracted_content = webpage_content_extractor.extract(html_path)
self.html_path_and_extracted_content_list.append(
{"html_path": html_path, "extracted_content": extracted_content}
)
self.done_count += 1
logger.success(
f"> [{self.done_count}/{self.total_count}] Extracted: {html_path}"
)
def extract(self, html_paths):
self.html_path = html_paths
self.total_count = len(self.html_path)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(self.extract_single_html, html_path)
for html_path in self.html_path
]
for idx, future in enumerate(concurrent.futures.as_completed(futures)):
result = future.result()
return self.html_path_and_extracted_content_list
if __name__ == "__main__":
html_root = Path(__file__).parents[1] / "files" / "urls" / "python tutorials"
html_paths = [
html_root / html_filename
for html_filename in [
"docs.python.org_zh-cn_3_tutorial_interpreter.html",
"stackoverflow.com_questions_295135_turn-a-string-into-a-valid-filename.html",
"www.liaoxuefeng.com_wiki_1016959663602400_1017495723838528.html",
]
]
batch_webpage_content_extractor = BatchWebpageContentExtractor()
html_path_and_extracted_content_list = batch_webpage_content_extractor.extract(
html_paths
)
# pprint(html_path_and_extracted_content_list)