Spaces:
Runtime error
Runtime error
import asyncio | |
import logging | |
import warnings | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import Any, Dict, Iterator, List, Optional, Union, cast | |
import aiohttp | |
import requests | |
from langchain.docstore.document import Document | |
from langchain.document_loaders.base import BaseLoader | |
logger = logging.getLogger(__name__) | |
default_header_template = { | |
"User-Agent": "", | |
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*" | |
";q=0.8", | |
"Accept-Language": "en-US,en;q=0.5", | |
"Referer": "https://www.google.com/", | |
"DNT": "1", | |
"Connection": "keep-alive", | |
"Upgrade-Insecure-Requests": "1", | |
} | |
def _build_metadata(soup: Any, url: str) -> dict: | |
"""Build metadata from BeautifulSoup output.""" | |
metadata = {"source": url} | |
if title := soup.find("title"): | |
metadata["title"] = title.get_text() | |
if description := soup.find("meta", attrs={"name": "description"}): | |
metadata["description"] = description.get("content", "No description found.") | |
if html := soup.find("html"): | |
metadata["language"] = html.get("lang", "No language found.") | |
return metadata | |
class AsyncHtmlLoader(BaseLoader): | |
"""Load `HTML` asynchronously.""" | |
def __init__( | |
self, | |
web_path: Union[str, List[str]], | |
header_template: Optional[dict] = None, | |
verify_ssl: Optional[bool] = True, | |
proxies: Optional[dict] = None, | |
autoset_encoding: bool = True, | |
encoding: Optional[str] = None, | |
default_parser: str = "html.parser", | |
requests_per_second: int = 2, | |
requests_kwargs: Optional[Dict[str, Any]] = None, | |
raise_for_status: bool = False, | |
ignore_load_errors: bool = False, | |
): | |
"""Initialize with a webpage path.""" | |
# TODO: Deprecate web_path in favor of web_paths, and remove this | |
# left like this because there are a number of loaders that expect single | |
# urls | |
if isinstance(web_path, str): | |
self.web_paths = [web_path] | |
elif isinstance(web_path, List): | |
self.web_paths = web_path | |
headers = header_template or default_header_template | |
if not headers.get("User-Agent"): | |
try: | |
from fake_useragent import UserAgent | |
headers["User-Agent"] = UserAgent().random | |
except ImportError: | |
logger.info( | |
"fake_useragent not found, using default user agent." | |
"To get a realistic header for requests, " | |
"`pip install fake_useragent`." | |
) | |
self.session = requests.Session() | |
self.session.headers = dict(headers) | |
self.session.verify = verify_ssl | |
if proxies: | |
self.session.proxies.update(proxies) | |
self.requests_per_second = requests_per_second | |
self.default_parser = default_parser | |
self.requests_kwargs = requests_kwargs or {} | |
self.raise_for_status = raise_for_status | |
self.autoset_encoding = autoset_encoding | |
self.encoding = encoding | |
self.ignore_load_errors = ignore_load_errors | |
def _fetch_valid_connection_docs(self, url: str) -> Any: | |
if self.ignore_load_errors: | |
try: | |
return self.session.get(url, **self.requests_kwargs) | |
except Exception as e: | |
warnings.warn(str(e)) | |
return None | |
return self.session.get(url, **self.requests_kwargs) | |
def _check_parser(parser: str) -> None: | |
"""Check that parser is valid for bs4.""" | |
valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"] | |
if parser not in valid_parsers: | |
raise ValueError( | |
"`parser` must be one of " + ", ".join(valid_parsers) + "." | |
) | |
def _scrape( | |
self, | |
url: str, | |
parser: Union[str, None] = None, | |
bs_kwargs: Optional[dict] = None, | |
) -> Any: | |
from bs4 import BeautifulSoup | |
if parser is None: | |
if url.endswith(".xml"): | |
parser = "xml" | |
else: | |
parser = self.default_parser | |
self._check_parser(parser) | |
html_doc = self._fetch_valid_connection_docs(url) | |
if not getattr(html_doc, "ok", False): | |
return None | |
if self.raise_for_status: | |
html_doc.raise_for_status() | |
if self.encoding is not None: | |
html_doc.encoding = self.encoding | |
elif self.autoset_encoding: | |
html_doc.encoding = html_doc.apparent_encoding | |
return BeautifulSoup(html_doc.text, parser, **(bs_kwargs or {})) | |
async def _fetch( | |
self, url: str, retries: int = 3, cooldown: int = 2, backoff: float = 1.5 | |
) -> str: | |
async with aiohttp.ClientSession() as session: | |
for i in range(retries): | |
try: | |
async with session.get( | |
url, | |
headers=self.session.headers, | |
ssl=None if self.session.verify else False, | |
) as response: | |
try: | |
text = await response.text() | |
except UnicodeDecodeError: | |
logger.error(f"Failed to decode content from {url}") | |
text = "" | |
return text | |
except aiohttp.ClientConnectionError as e: | |
if i == retries - 1 and self.ignore_load_errors: | |
logger.warning(f"Error fetching {url} after {retries} retries.") | |
return "" | |
elif i == retries - 1: | |
raise | |
else: | |
logger.warning( | |
f"Error fetching {url} with attempt " | |
f"{i + 1}/{retries}: {e}. Retrying..." | |
) | |
await asyncio.sleep(cooldown * backoff**i) | |
raise ValueError("retry count exceeded") | |
async def _fetch_with_rate_limit( | |
self, url: str, semaphore: asyncio.Semaphore | |
) -> str: | |
async with semaphore: | |
return await self._fetch(url) | |
async def fetch_all(self, urls: List[str]) -> Any: | |
"""Fetch all urls concurrently with rate limiting.""" | |
semaphore = asyncio.Semaphore(self.requests_per_second) | |
tasks = [] | |
for url in urls: | |
task = asyncio.ensure_future(self._fetch_with_rate_limit(url, semaphore)) | |
tasks.append(task) | |
try: | |
from tqdm.asyncio import tqdm_asyncio | |
return await tqdm_asyncio.gather( | |
*tasks, desc="Fetching pages", ascii=True, mininterval=1 | |
) | |
except ImportError: | |
warnings.warn("For better logging of progress, `pip install tqdm`") | |
return await asyncio.gather(*tasks) | |
def lazy_load(self) -> Iterator[Document]: | |
"""Lazy load text from the url(s) in web_path.""" | |
for doc in self.load(): | |
yield doc | |
def load(self) -> List[Document]: | |
"""Load text from the url(s) in web_path.""" | |
try: | |
# Raises RuntimeError if there is no current event loop. | |
asyncio.get_running_loop() | |
# If there is a current event loop, we need to run the async code | |
# in a separate loop, in a separate thread. | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
future = executor.submit(asyncio.run, self.fetch_all(self.web_paths)) | |
results = future.result() | |
except RuntimeError: | |
results = asyncio.run(self.fetch_all(self.web_paths)) | |
docs = [] | |
for i, text in enumerate(cast(List[str], results)): | |
soup = self._scrape(self.web_paths[i]) | |
if not soup: | |
continue | |
metadata = _build_metadata(soup, self.web_paths[i]) | |
docs.append(Document(page_content=text, metadata=metadata)) | |
return docs | |