hsg_rag_eea / src /scraping /utils.py
Pygmales
synched versions
698965e
raw
history blame
7.43 kB
import hashlib
import json
import requests, difflib, datetime
from email.utils import parsedate_to_datetime
from functools import lru_cache
from collections import defaultdict
from urllib.robotparser import RobotFileParser
from urllib.error import URLError
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
from src.scraping.types import FetchResult
from ..config import config
from ..const.page_priority import *
from ..utils.logging import get_logger
from ..utils.tools import call_with_exponential_backoff
logger = get_logger('scraper.utils')
ua = UserAgent()
@lru_cache
def _fuzzy_match(word, keyword, threshold=0.8):
"""
Check if word fuzzy matches keyword using difflib ratio.
"""
return difflib.SequenceMatcher(None, word.lower(), keyword.lower()).ratio() >= threshold
def detect_page_topic_and_priority(text: str) -> dict[str, str]:
result = {
'priority': 'low',
'topic': 'none',
}
if not text: return result
text_lower = text.lower()
words = text_lower.split()
topic_counter = { prio: defaultdict(int) for prio in PAGE_PRIORITY_KEYWORDS.keys() }
prio_counter = { prio: 0 for prio in PAGE_PRIORITY_KEYWORDS.keys() }
for word in words:
for prio, kws in PAGE_PRIORITY_KEYWORDS.items():
for kw in kws:
if _fuzzy_match(word, kw):
topic_counter[prio][kw] += 1
prio_counter[prio] += sum(topic_counter[prio].values())
if max(prio_counter.values()) == 0:
return result
top_prio = max(prio_counter.keys(), key=lambda k: prio_counter[k])
top_topic = max(topic_counter[top_prio].keys(), key=lambda k: topic_counter[top_prio][k])
result['priority'] = top_prio
result['topic'] = top_topic
return result
def detect_chunk_topic(text: str) -> str:
if not text: return 'none'
text_lower = text.lower()
words = text_lower.split()
topic_counter = { topic: 0 for topic in CHUNK_TOPIC_KEYWORDS.keys() }
for word in words:
for topic, kws in CHUNK_TOPIC_KEYWORDS.items():
topic_counter[topic] += len(list(filter(lambda kw: _fuzzy_match(word, kw), kws)))
if max(topic_counter.values()) == 0:
return 'none'
top_topic = max(topic_counter.keys(), key=lambda k: topic_counter[k])
return top_topic
def hash_html(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose()
text = soup.get_text()
return hashlib.sha256(text.encode()).hexdigest()
def parse_isoformat(data: str) -> datetime.datetime:
if not data:
return None
try:
return parsedate_to_datetime(data)
except (TypeError, ValueError):
pass
try:
return datetime.datetime.fromisoformat(data)
except ValueError:
pass
return None
def extract_last_modified(response, html) -> datetime.datetime | None:
last_modified = response.headers.get("Last-Modified", None)
soup = BeautifulSoup(html, "html.parser")
if not last_modified:
for key in [ ("name", "last-modified"), ("property", "article:modified_time")]:
tag = soup.find("meta", {key[0]: key[1]})
if tag:
last_modified = tag.get("content")
break
if not last_modified:
scripts = soup.find_all("script", {"type": "application/ld+json"})
for script in scripts:
try:
data = json.loads(script.string)
except:
continue
graph = data.get("@graph") if isinstance(data, dict) else None
if graph:
for item in graph:
if item.get("@type") in ["WebPage", "Article"]:
last_modified = item.get("dateModified")
if last_modified:
break
return parse_isoformat(last_modified)
def fetch_head(url: str, etag: str | None = None) -> FetchResult:
try:
headers = {"User-Agent": ua.chrome}
if etag:
headers["If-None-Match"] = etag
response = requests.head(
url,
allow_redirects=True,
timeout=15,
headers=headers
)
if response.status_code == 304:
return FetchResult(not_modified=True)
if response.status_code >= 400:
logger.warning(f"HTTP {response.status_code} for URL '{url}'")
raise Exception()
return FetchResult(
final_url = response.url,
last_modified = response.headers.get('Last-Modified'),
etag = response.headers.get('ETag')
)
except Exception as e:
logger.exception(f"Head fetch failed: {url}")
raise e
def fetch_url(url: str, etag: str | None = None) -> dict:
try:
headers = {"User-Agent": ua.chrome}
if etag:
headers["If-None-Match"] = etag
response = requests.get(
url,
allow_redirects=True,
timeout=15,
headers=headers
)
if response.status_code == 304:
return FetchResult(not_modified=True)
if response.status_code >= 400:
logger.warning(f"HTTP {response.status_code} for URL '{url}'")
raise Exception()
html = response.text
etag = response.headers.get("ETag")
last_modified = extract_last_modified(response, html)
page_hash = hash_html(html)
return FetchResult(
text = html,
final_url = response.url,
page_hash = page_hash,
last_modified = last_modified,
etag = etag,
)
except Exception as e:
logger.exception(f"Fetch failed: {url}")
raise e
def _robots_exist(robots_url) -> bool:
try:
logger.info(f"Checking if 'robots.txt' accessible on path '{robots_url}'...")
response = requests.head(robots_url, allow_redirects=True, timeout=config.scraping.TIMEOUT)
if response.status_code >= 400:
logger.error("Cannot access the 'robots.txt' - recieved status code {response.status_code}!")
return False
return True
except requests.RequestException as e:
raise requests.RequestException(f"An error occured while requesting the URL '{robots_url}': {e}")
except Exception as e:
raise e
def parse_robots(base_url: str) -> RobotFileParser | None:
robots_url = f'{base_url.rstrip('/')}/robots.txt'
# Check whether the robots.txt file is accessible from this url
response = call_with_exponential_backoff(_robots_exist, args=(robots_url,))
if not response['result']: return None
logger.info(f"File 'robots.txt' found for the target url '{base_url}'")
rp = RobotFileParser()
rp.set_url(robots_url)
# Parse existing robots.txt file into the parser
def fetch_robots():
try:
rp.read()
except URLError as e:
raise URLError(f"Failed to fetch the 'robots.txt': {e}")
response = call_with_exponential_backoff(fetch_robots)
if response['status'] == 'FAIL':
logger.error(f"Failed to fetch the 'robots.txt': {response['last_error']}")
return None
return rp