import concurrent.futures import re from pathlib import Path from pprint import pprint from bs4 import BeautifulSoup from tiktoken import get_encoding as tiktoken_get_encoding from utils.logger import logger from markdownify import markdownify from networks.network_configs import IGNORE_TAGS, IGNORE_CLASSES from termcolor import colored class WebpageContentExtractor: def __init__(self): self.tokenizer = tiktoken_get_encoding("cl100k_base") def count_tokens(self, text): tokens = self.tokenizer.encode(text) token_count = len(tokens) return token_count def html_to_markdown(self, html_str, ignore_links=True): if ignore_links: markdown_str = markdownify(html_str, strip="a") else: markdown_str = markdownify(html_str) markdown_str = re.sub(r"\n{3,}", "\n\n", markdown_str) self.markdown_token_count = self.count_tokens(markdown_str) logger.mesg(f'- Tokens: {colored(self.markdown_token_count,"light_green")}') self.markdown_str = markdown_str return self.markdown_str def remove_elements_from_html(self, html_str): soup = BeautifulSoup(html_str, "html.parser") ignore_classes_with_parentheses = [f"({word})" for word in IGNORE_CLASSES] ignore_classes_pattern = f'{"|".join(ignore_classes_with_parentheses)}' removed_element_counts = 0 for element in soup.find_all(): class_str = "" id_str = "" try: class_attr = element.get("class", []) if class_attr: class_str = " ".join(list(class_attr)) if id_str: class_str = f"{class_str} {id_str}" except: pass try: id_str = element.get("id", "") except: pass if ( (not element.text.strip()) or (element.name in IGNORE_TAGS) or (re.search(ignore_classes_pattern, class_str, flags=re.IGNORECASE)) or (re.search(ignore_classes_pattern, id_str, flags=re.IGNORECASE)) ): element.decompose() removed_element_counts += 1 logger.mesg( f"- Elements: " f'{colored(len(soup.find_all()),"light_green")} / {colored(removed_element_counts,"light_red")}' ) html_str = str(soup) self.html_str = html_str return self.html_str def extract(self, html_path): logger.note(f"Extracting content from: {html_path}") if not Path(html_path).exists(): logger.warn(f"File not found: {html_path}") return "" encodings = ["utf-8", "latin-1"] for encoding in encodings: try: with open(html_path, "r", encoding=encoding, errors="ignore") as rf: html_str = rf.read() break except UnicodeDecodeError: pass else: logger.warn(f"No matching encodings: {html_path}") return "" html_str = self.remove_elements_from_html(html_str) markdown_str = self.html_to_markdown(html_str) return markdown_str class BatchWebpageContentExtractor: def __init__(self) -> None: self.html_path_and_extracted_content_list = [] self.done_count = 0 def extract_single_html(self, html_path): webpage_content_extractor = WebpageContentExtractor() extracted_content = webpage_content_extractor.extract(html_path) self.html_path_and_extracted_content_list.append( {"html_path": html_path, "extracted_content": extracted_content} ) self.done_count += 1 logger.success( f"> [{self.done_count}/{self.total_count}] Extracted: {html_path}" ) def extract(self, html_paths): self.html_path = html_paths self.total_count = len(self.html_path) with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(self.extract_single_html, html_path) for html_path in self.html_path ] for idx, future in enumerate(concurrent.futures.as_completed(futures)): result = future.result() return self.html_path_and_extracted_content_list if __name__ == "__main__": html_root = Path(__file__).parents[1] / "files" / "urls" / "python tutorials" html_paths = [ html_root / html_filename for html_filename in [ "docs.python.org_zh-cn_3_tutorial_interpreter.html", "stackoverflow.com_questions_295135_turn-a-string-into-a-valid-filename.html", "www.liaoxuefeng.com_wiki_1016959663602400_1017495723838528.html", ] ] batch_webpage_content_extractor = BatchWebpageContentExtractor() html_path_and_extracted_content_list = batch_webpage_content_extractor.extract( html_paths ) # pprint(html_path_and_extracted_content_list)