Spaces:
Running
Running
| import os, shutil, json | |
| from datetime import datetime | |
| from collections import Counter, defaultdict | |
| from urllib.parse import urlsplit | |
| from urllib.robotparser import RobotFileParser | |
| from usp.objects.sitemap import InvalidSitemap | |
| from usp.tree import sitemap_tree_for_homepage | |
| from src.notification.notification_center import NotificationCenter | |
| from .utils import * | |
| from .types import * | |
| from .html_processor import HTMLProcessor | |
| from .content_cleaner import ContentCleaner | |
| from .url_normalizer import UrlNormalizer | |
| from ..utils.lang import detect_language | |
| from ..utils.logging import get_logger | |
| from ..utils.tools import call_with_exponential_backoff | |
| from ..config import config | |
| logger = get_logger('scraper.core') | |
| incupd_logger = get_logger('scraper.incremental_updates') | |
| class Scraper: | |
| def __init__(self, scrape_all: bool = True) -> None: | |
| self._scrape_all = scrape_all | |
| self._path = config.paths | |
| self._processor: HTMLProcessor = HTMLProcessor() | |
| self._normalizer: UrlNormalizer = UrlNormalizer() | |
| self._content_cleaner: ContentCleaner = ContentCleaner(self._scrape_all) | |
| self._notif_center: NotificationCenter = NotificationCenter() | |
| self._make_directories() | |
| self._url_temp_timestamps: dict[str, UrlTimestamps] = {} | |
| self._url_timestamps: dict[str, UrlTimestamps] = self._load_data(self._path.SCRAPING_OUTPUT, 'url_timestamps') | |
| self._url_priorities: dict[str, list[str]] = self._load_data(self._path.URLS_OUTPUT, 'url_priorities') | |
| logger.info(f'Successfully initialized the scraper') | |
| if scrape_all: | |
| logger.info("Initialized with SCRAPE_ALL=True. Timestamps and priorities will be ignored for this scraping session") | |
| def _make_directories(self) -> None: | |
| os.makedirs(self._path.URLS_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.CHUNKS_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.TEMP_CHUNKS_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.SCRAPING_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.RAW_HTML_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.RAW_TEXT_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.METADATA_OUTPUT, exist_ok=True) | |
| os.makedirs(self._path.EXTRACTED_TEXT_OUTPUT, exist_ok=True) | |
| def scrape_target(self, target_url: str) -> list[ChunkMetadata]: | |
| # Step 1: Analyze the target URL for availability, robots and sitemap | |
| analyzed_domain = self._analyze_domain(target_url) | |
| if not analyzed_domain: | |
| logger.error(f"Failed to scrape target URL {target_url}") | |
| return {} | |
| sitemap_urls = analyzed_domain.urls | |
| self._save_results(self._path.URLS_OUTPUT, 'sitemap_urls', sitemap_urls, target_url) | |
| # Step 2: Validate and scrape URLs listed in the sitemap | |
| analyzed_sitemap = self._analyze_sitemap(analyzed_domain) | |
| documents = analyzed_sitemap.documents | |
| logger.info(f"Indexed {len(sitemap_urls)} sitemap URLs for target URL {target_url}") | |
| logger.info(f"Scraped {len(documents)} unique URLs (others were either redirects or blacklisted)") | |
| # Step 3: Analyze discovered URLs and search for the new ones | |
| discovered_urls = analyzed_sitemap.discovered_urls | |
| logger.info(f"Discovered {len(discovered_urls)} new URLs during sitemap analysis") | |
| analyzed_discoveries = self._analyze_discoveries(discovered_urls, sitemap_urls, analyzed_domain) | |
| discovered_urls = analyzed_discoveries.discovered_urls | |
| self._save_results(self._path.URLS_OUTPUT, 'discovered_urls', discovered_urls, target_url) | |
| documents.extend(analyzed_discoveries.documents) | |
| logger.info(f"Indexed {len(discovered_urls)} new URLs for target URL {target_url}") | |
| # Step 4: Load temp chunks first so resume works even when there are no new documents. | |
| temp_filename = self._get_temp_chunks_filename(target_url) | |
| temp_merged_chunks = self._load_data(self._path.TEMP_CHUNKS_OUTPUT, temp_filename) | |
| if not documents and not temp_merged_chunks: | |
| logger.info(f"No new content was scraped from the target URL {target_url}") | |
| return {} | |
| tagged_documents = [] | |
| # Step 5: Analyze the converted URLs | |
| if documents: | |
| self._content_cleaner.perform_content_analysis(target_url, self._normalizer.url_to_filename(target_url)) | |
| analyzied_documents = self._analyze_url_documents(documents) | |
| self._save_results(self._path.URLS_OUTPUT, 'url_tags', analyzied_documents.url_tags) | |
| self._save_results(self._path.URLS_OUTPUT, 'url_priorities', analyzied_documents.url_priorities) | |
| tagged_documents = analyzied_documents.tagged_documents | |
| # Step 6: Collect and save chunks | |
| chunk_metadatas = self._collect_chunks(tagged_documents, target_url, temp_merged_chunks) | |
| self._save_results(self._path.METADATA_OUTPUT, 'raw_chunk_metadata', chunk_metadatas['raw'], target_url) | |
| self._save_results(self._path.METADATA_OUTPUT, 'merged_chunk_metadata', chunk_metadatas['merged'], target_url) | |
| self._save_results(self._path.METADATA_OUTPUT, 'deleted_chunk_metadata', chunk_metadatas['deleted'], target_url) | |
| logger.info(f"Collected {len(chunk_metadatas['merged'])} chunks from target URL {target_url}") | |
| logger.info(f"Scraping finished for target URL '{target_url}'") | |
| return chunk_metadatas['final'] | |
| def _analyze_domain(self, target_url: str) -> DomainAnalysisReport | None: | |
| if not target_url: | |
| logger.warning('The target URL string is empty!') | |
| return None | |
| # Step 1: Test whether the target URL is even accessible before initializing the scraping procedure | |
| response = call_with_exponential_backoff(fetch_url, args=(target_url,)) | |
| if response['status'] == 'FAIL': | |
| logger.error(f"Unaccessible target URL '{target_url}': {response['last_error']}") | |
| return None | |
| if not response['result']: | |
| logger.warning(f"Unnaccessible target URL '{target_url}': Recieved client/server error!") | |
| return None | |
| # Step 2: Fetch and parse robots | |
| logger.info(f"Fetching 'robots.txt' for the target URL '{target_url}'...") | |
| robots_parser: RobotFileParser = parse_robots(target_url) | |
| if not robots_parser: | |
| logger.warning( | |
| f"Could not fetch the 'robots.txt' file for the target URL '{target_url}'! " + | |
| "(Are you sure the scraping begins from root?)" | |
| ) | |
| return None | |
| logger.info(f"Parsed the 'robots.txt' file for target URL '{target_url}'") | |
| delay = robots_parser.crawl_delay('scraper') | |
| target_domain = urlsplit(target_url).netloc | |
| # Step 3: Fetch and parse sitemaps | |
| logger.info(f"Fetching sitemaps for target URL {target_url}...") | |
| sitemap_tree = sitemap_tree_for_homepage(target_url) | |
| if isinstance(sitemap_tree, InvalidSitemap): | |
| logger.error(f"Cannot fetch sitemap for target URL '{target_url}': Invalid sitemap structure!") | |
| return None | |
| page_data = [] | |
| page_urls = set() | |
| for page in sitemap_tree.all_pages(): | |
| page_url = page.url | |
| if not robots_parser.can_fetch('scraper', page_url) or page_url in page_urls: | |
| continue | |
| page_urls.add(page_url) | |
| page_data.append(PageData(page_url, page.last_modified)) | |
| logger.info(f'Loaded sitemaps with {len(page_data)} pages') | |
| return DomainAnalysisReport( | |
| target = target_domain, | |
| urls = list(page_urls), | |
| pages = page_data, | |
| delay = delay, | |
| ) | |
| def _analyze_sitemap(self, domain: DomainAnalysisReport) -> UrlAnalysisReport: | |
| documents = [] | |
| visited_urls = set() | |
| discovered_urls = set() | |
| rejected_urls = [] | |
| sitemap_pages = domain.pages | |
| logger.info(f'Starting validation and scraping for sitemap URLs...') | |
| for page in sitemap_pages: | |
| result = self._scrape_page(page.url, domain.delay, visited_urls, last_modified=page.last_modified) | |
| visited_urls.add(page.url) | |
| if result.status != ScrapingStatus.OK: | |
| if result.status == ScrapingStatus.REJECTED: | |
| rejected_urls.append(page.url) | |
| continue | |
| final_url = result.final_url | |
| documents.append(result.document) | |
| visited_urls.add(final_url) | |
| self._store_timestamps(final_url, result.timestamps, temp=True) | |
| new_urls = self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target) | |
| discovered_urls |= new_urls | |
| if len(rejected_urls) > len(sitemap_pages)*0.1: | |
| rejection_rate = len(rejected_urls)/len(sitemap_pages) | |
| logger.warning(f"Rejection rate is {rejection_rate}") | |
| self._notif_center.send_notification( | |
| subject = "⚠ WARNING: Scraping rejection rate is >10%!", | |
| body = f"Rejection rate: {int(rejection_rate*100)}%\n" + | |
| f"Failed to scrape following URLs for target domain {domain.target}:\n" + | |
| "\n".join([f"\t- {url}" for url in rejected_urls]), | |
| channel = "slack", | |
| ) | |
| discovered_urls = [url for url in discovered_urls if url not in visited_urls] | |
| return UrlAnalysisReport( | |
| documents = documents, | |
| discovered_urls = discovered_urls, | |
| ) | |
| def _analyze_discoveries( | |
| self, | |
| discovered_urls: list, | |
| sitemap_urls: list, | |
| domain: DomainAnalysisReport | |
| ) -> UrlAnalysisReport: | |
| if len(discovered_urls) == 0: | |
| return UrlAnalysisReport([], []) | |
| documents = [] | |
| discoveries = discovered_urls.copy() | |
| visited_urls = set(sitemap_urls.copy()) | |
| discovered_urls = [{'url': url, 'depth': 0} for url in discovered_urls] | |
| logger.info(f"Starting validation and scraping for discovered URLs...") | |
| while discovered_urls: | |
| discovered_url = discovered_urls.pop() | |
| url = discovered_url['url'] | |
| result = self._scrape_page(url, domain.delay, visited_urls, discovery_depth=discovered_url['depth']) | |
| visited_urls.add(url) | |
| if not result: continue | |
| final_url = result.final_url | |
| documents.append(result.document) | |
| visited_urls.add(final_url) | |
| discoveries.append(final_url) | |
| self._store_timestamps(final_url, result.timestamps, temp=True) | |
| for new_url in self._normalizer.filter_discovered_urls(result.discovered_urls, visited_urls, domain.target): | |
| discovered_urls.append({'url': new_url, 'depth': result.discovery_depth}) | |
| return UrlAnalysisReport( | |
| documents = documents, | |
| discovered_urls = discoveries, | |
| ) | |
| def _analyze_url_documents(self, documents: list) -> DocumentAnalysisReport: | |
| url_tags = {} | |
| url_priorities = defaultdict(list) | |
| tagged_documents = [] | |
| logger.info(f"Analyzing scraped contents of {len(documents)} pages...") | |
| for document in documents: | |
| url = document.name | |
| self._content_cleaner.clean_document(document) | |
| extracted_text = self._processor.convert_to_txt(document) | |
| if extracted_text.strip() == '': | |
| logger.warning(f'No text extracted from {url}. Skipping ...') | |
| continue | |
| url_filename = self._normalizer.url_to_filename(url) | |
| extracted_text_file_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt') | |
| with open(extracted_text_file_path, 'w', encoding="utf-8") as f: | |
| f.write(extracted_text) | |
| logger.info(f"Saved extracted text for URL '{url}' under '{extracted_text_file_path}'") | |
| language = detect_language(extracted_text) | |
| tp_result = detect_page_topic_and_priority(extracted_text) | |
| programs = self._processor.strategies_processor.apply_strategy( | |
| strategy_name='programs', | |
| arguments={'document_content': extracted_text}, | |
| ) | |
| program = programs[0] if programs else 'no program' | |
| tags = UrlTags( | |
| topic = tp_result['topic'], | |
| priority = tp_result['priority'], | |
| language = language, | |
| program = program, | |
| ) | |
| url_tags[url] = tags | |
| url_priorities[tp_result['priority']].append(url) | |
| tagged_documents.append(TaggedDocument(document, DocumentTags(program, language))) | |
| return DocumentAnalysisReport( | |
| url_tags = url_tags, | |
| url_priorities = url_priorities, | |
| tagged_documents = tagged_documents, | |
| ) | |
| def _collect_chunks( | |
| self, | |
| tagged_documents: list[dict], | |
| target_url: str, | |
| temp_chunks: dict[str, list[ChunkMetadata]] | None = None, | |
| ) -> dict[str, list[ChunkMetadata]]: | |
| raw_chunks = [] | |
| deleted_chunks = [] | |
| merged_chunks, final_chunks = self._read_temp_chunks(temp_chunks, tagged_documents) | |
| program_counter = self._build_program_counter_from_merged_chunks(merged_chunks) | |
| if merged_chunks: incupd_logger.info(f"Restored {len(merged_chunks)} chunks from temp") | |
| for entry in tagged_documents: | |
| document = entry.document | |
| program = entry.tags.program | |
| language = entry.tags.language | |
| url = document.name | |
| url_filename = self._normalizer.url_to_filename(url) | |
| program_counter[program] += 1 | |
| doc_chunks_dir_path = os.path.join(config.paths.CHUNKS_OUTPUT, url_filename) | |
| if os.path.exists(doc_chunks_dir_path): shutil.rmtree(doc_chunks_dir_path) | |
| os.makedirs(doc_chunks_dir_path) | |
| mergible_chunks_metadatas = [] | |
| raw_chunk_count = 0 | |
| for i, chunk in enumerate(self._processor.chunk(document), start=1): | |
| raw_chunk_count = i | |
| chunk_file_path = os.path.join(doc_chunks_dir_path, f"chunk_{i}.txt") | |
| with open(chunk_file_path, 'w', encoding="utf-8") as f: | |
| f.write(chunk['text']) | |
| chunk_topic = detect_chunk_topic(chunk['text']) | |
| chunk_metadata = ChunkMetadata( | |
| chunk_id = f"{program.lower()}_{program_counter[program]:03d}_{i:02d}", | |
| text = chunk['text'], | |
| source_url = url, | |
| program = program, | |
| language = language, | |
| topic = chunk_topic, | |
| last_scraped = datetime.now(), | |
| page_title = self._processor.extract_title(document), | |
| section_heading = chunk['title'], | |
| token_size = chunk['size'], | |
| ) | |
| raw_chunks.append(chunk_metadata) | |
| if chunk_topic == 'none': | |
| deleted_chunks.append(chunk_metadata) | |
| else: | |
| mergible_chunks_metadatas.append(chunk_metadata) | |
| logger.info(f"Collected {raw_chunk_count} raw chunks and saved under '{doc_chunks_dir_path}'") | |
| merged_chunk_metadatas = self._processor.merge_chunks_by_topic(mergible_chunks_metadatas) | |
| merged_chunks.extend(merged_chunk_metadatas) | |
| self._store_temp_chunks(target_url, url, merged_chunk_metadatas) | |
| logger.info(f"Merged {raw_chunk_count} raw chunks into {len(merged_chunk_metadatas)} chunks by topic") | |
| prepared_chunks = self._processor.prepare_chunks(url, self._processor.convert_to_txt(document), merged_chunk_metadatas) | |
| for lang in final_chunks.keys(): | |
| if lang in prepared_chunks.keys(): | |
| final_chunks[lang].extend(prepared_chunks[lang]) | |
| return { | |
| 'raw': raw_chunks, | |
| 'merged': merged_chunks, | |
| 'deleted': deleted_chunks, | |
| 'final': final_chunks, | |
| } | |
| def _read_temp_chunks( | |
| self, | |
| temp_chunks: dict[str, list[ChunkMetadata]], | |
| tagged_documents: list[TaggedDocument] | |
| ) -> set[list, list[dict]]: | |
| loaded_temp_chunks = temp_chunks.copy() | |
| prepared_temp_chunks = {lang: [] for lang in config.get('AVAILABLE_LANGUAGES', ['en', 'de'])} | |
| for url in [entry.document.name for entry in tagged_documents]: | |
| if url in temp_chunks.keys(): | |
| incupd_logger.info(f"Deleted stored temp data for URL {url} as it was newly scraped") | |
| del loaded_temp_chunks[url] | |
| restored_temp_chunks = [] | |
| for url, chunks in loaded_temp_chunks.items(): | |
| url_filename = self._normalizer.url_to_filename(url) | |
| extracted_text_path = os.path.join(self._path.EXTRACTED_TEXT_OUTPUT, url_filename + '.txt') | |
| if not os.path.exists(extracted_text_path): | |
| incupd_logger.warning(f"Cannot restore chunks for URL {url}: Failed to locate previously extracted contents!") | |
| incupd_logger.warning(f"This URL will has to be rescraped in the next session") | |
| continue | |
| with open(extracted_text_path, 'r') as f: | |
| url_text = f.read() | |
| prepared_chunks = self._processor.prepare_chunks(url, url_text, chunks) | |
| for lang in prepared_temp_chunks.keys(): | |
| if lang in prepared_chunks.keys(): | |
| prepared_temp_chunks[lang].extend(prepared_chunks[lang]) | |
| restored_temp_chunks.extend(chunks) | |
| incupd_logger.info(f"Restored {len(chunks)} chunks for URL {url} from temp") | |
| return restored_temp_chunks, prepared_temp_chunks | |
| def _store_temp_chunks(self, target_url: str, url: str, chunks: list[ChunkMetadata]) -> None: | |
| self._url_timestamps[url] = self._url_temp_timestamps[url] | |
| temp_chunks = {url: chunks} | |
| self._save_results(self._path.TEMP_CHUNKS_OUTPUT, self._get_temp_chunks_filename(target_url), temp_chunks) | |
| self._save_results(self._path.SCRAPING_OUTPUT, 'url_timestamps', self._url_timestamps) | |
| incupd_logger.info(f"Stored {len(chunks)} chunks in temp for URL {url}") | |
| def _build_program_counter_from_merged_chunks(self, merged_chunks: list[ChunkMetadata]) -> Counter: | |
| counter = Counter() | |
| seen = set() | |
| for chunk in merged_chunks: | |
| key = (chunk.program, chunk.source_url) | |
| if key not in seen: | |
| counter[chunk.program] += 1 | |
| seen.add(key) | |
| return counter | |
| def _is_url_modified( | |
| self, | |
| url: str, | |
| new_last_modified: datetime | None = None, | |
| new_page_hash: str | None = None | |
| ) -> bool: | |
| if url not in self._url_timestamps.keys(): | |
| return True | |
| stored = self._url_timestamps[url] | |
| if stored.last_modified and new_last_modified: | |
| return stored.last_modified < new_last_modified | |
| if new_page_hash and stored.page_hash: | |
| return new_page_hash != stored.page_hash | |
| return True | |
| def _store_timestamps(self, url: str, timestamps: UrlTimestamps, temp=False) -> None: | |
| if temp: | |
| self._url_temp_timestamps[url] = timestamps | |
| else: | |
| self._url_timestamps[url] = timestamps | |
| def _get_temp_chunks_filename(self, target_url: str) -> str: | |
| return self._normalizer.url_to_filename(target_url) + '_merged_chunks' | |
| def delete_temp_merged_chunks(self, target_url: str) -> None: | |
| temp_path = os.path.join( | |
| self._path.TEMP_CHUNKS_OUTPUT, | |
| self._get_temp_chunks_filename(target_url) + '.json' | |
| ) | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| incupd_logger.info(f"Deleted temp merged chunks file '{temp_path}'") | |
| def _get_etag(self, url: str) -> str | None: | |
| if url not in self._url_timestamps.keys(): | |
| return None | |
| return self._url_timestamps[url].etag | |
| def _is_fetch_valid(self, url: str, visited_urls: list[str], fetch_result: FetchResult) -> ScrapingStatus: | |
| if not fetch_result: | |
| logger.warning(f"Cannot fetch {url}! Skipping...") | |
| return ScrapingStatus.REJECTED | |
| if fetch_result.not_modified: | |
| logger.info("No updates on the page, skipping...") | |
| return ScrapingStatus.NO_UPDATES | |
| final_url = fetch_result.final_url | |
| if final_url != url: | |
| logger.info(f"Redirect detected: '{url}' --> '{final_url}'") | |
| if final_url in visited_urls: | |
| logger.info(f"'{final_url}' was already visited, skipping...") | |
| return ScrapingStatus.VISITED | |
| logger.info(f"Continuing with URL '{final_url}'...") | |
| last_modified = fetch_result.last_modified | |
| page_hash = fetch_result.page_hash | |
| if not self._scrape_all and not self._is_url_modified(final_url, new_last_modified=last_modified, new_page_hash=page_hash): | |
| logger.info(f"URL {final_url} was not modified since last scraping session, skipping...") | |
| return ScrapingStatus.NO_UPDATES | |
| return ScrapingStatus.OK | |
| def _is_url_prioritized(self, url) -> bool: | |
| if url not in self._url_timestamps.keys(): | |
| return True | |
| for prio, urls in self._url_priorities.items(): | |
| if url in urls: | |
| return self._is_scraping_scheduled(url, prio) | |
| return True | |
| def _is_scraping_scheduled(self, url, prio) -> bool: | |
| current_timestamp = datetime.now() | |
| saved_timestamp = self._url_timestamps[url].last_scraped | |
| time_difference = current_timestamp - saved_timestamp | |
| if not saved_timestamp: | |
| return True | |
| for interval_prio, interval in config.scraping.INTERVALS.items(): | |
| if prio == interval_prio: | |
| return time_difference.days >= interval | |
| return True | |
| def _scrape_page( | |
| self, url: str, | |
| crawl_delay: float, | |
| visited_urls: list[str], | |
| discovery_depth: int = 0, | |
| last_modified: datetime | None = None | |
| ) -> ScrapingResult | None: | |
| if not url: | |
| return ScrapingResult(status=ScrapingStatus.REJECTED) | |
| if self._normalizer.is_url_blacklisted(url): | |
| logger.info(f"URL {url} is blacklisted by scraper, skipping...") | |
| return ScrapingResult(status=ScrapingStatus.BLACKLISTED) | |
| if url in visited_urls: | |
| logger.info(f'URL {url} was already analyzed via redirect, skipping...') | |
| return ScrapingResult(status=ScrapingStatus.VISITED) | |
| if not self._scrape_all and last_modified and not self._is_url_modified(url, new_last_modified=last_modified): | |
| logger.info(f"URL '{url}' was not modified since last scraping session, skipping...") | |
| self._url_timestamps[url].last_modified = last_modified | |
| return ScrapingResult(status=ScrapingStatus.NO_UPDATES) | |
| if not self._scrape_all and not self._is_url_prioritized(url): | |
| logger.info(f"URL {url} is not prioritized, skipping") | |
| return ScrapingResult(status=ScrapingStatus.NO_UPDATES) | |
| logger.info(f"Fetching head for URL '{url}'...") | |
| etag = self._get_etag(url) | |
| response = call_with_exponential_backoff(fetch_head, args=(url, etag), delay=crawl_delay) | |
| if response['status'] == 'FAIL': | |
| logger.warning(f"Failed to fetch head for URL {url}: {response['last_error']}! Skipping...") | |
| return ScrapingResult(status=ScrapingStatus.REJECTED) | |
| fetch_result = response['result'] | |
| validation = self._is_fetch_valid(url, visited_urls, fetch_result) | |
| if validation != ScrapingStatus.OK: | |
| return ScrapingResult(status=validation) | |
| response = call_with_exponential_backoff(fetch_url, args=(url, etag), delay=crawl_delay) | |
| if response['status'] == 'FAIL': | |
| logger.warning(f"Failed to fetch URL {url}: {response['last_error']}! Skipping...") | |
| return ScrapingResult(status=ScrapingStatus.REJECTED) | |
| fetch_result = response['result'] | |
| validation = self._is_fetch_valid(url, visited_urls, fetch_result) | |
| if validation != ScrapingStatus.OK: | |
| return ScrapingResult(status=validation) | |
| if not fetch_result.last_modified: | |
| logger.warning("No information about URL last modification date exists!") | |
| timestamps = UrlTimestamps( | |
| last_modified = fetch_result.last_modified, | |
| last_scraped = datetime.now(), | |
| etag = fetch_result.etag, | |
| page_hash = fetch_result.page_hash, | |
| ) | |
| raw_html = fetch_result.text | |
| final_url = fetch_result.final_url | |
| url_filename = self._normalizer.url_to_filename(final_url) | |
| raw_html_file_path = os.path.join(config.paths.RAW_HTML_OUTPUT, url_filename + '.html') | |
| with open(raw_html_file_path, 'w', encoding="utf-8") as f: | |
| f.write(raw_html) | |
| logger.info(f"Saved fetched HTML under '{raw_html_file_path}'") | |
| logger.info(f"Cleaning URL {final_url} from mobile data...") | |
| cleaned_html = self._content_cleaner.clean_mobile_content(raw_html) | |
| logger.info(f"Processing URL {final_url}...") | |
| document = self._processor.process(final_url, cleaned_html) | |
| if not document: | |
| logger.warning(f"Failed to process URL '{final_url}'! Skipping...") | |
| return ScrapingResult(status=ScrapingStatus.REJECTED) | |
| discovered_urls = self._content_cleaner.extract_urls(document) if discovery_depth <= 3 else [] | |
| self._content_cleaner.collect_repetitive_content(document) | |
| raw_text = self._processor.convert_to_txt(document) | |
| raw_text_file_path = os.path.join(config.paths.RAW_TEXT_OUTPUT, url_filename + '.txt') | |
| with open(raw_text_file_path, 'w', encoding="utf-8") as f: | |
| f.write(raw_text) | |
| logger.info(f"Saved raw text for URL '{final_url}' under '{raw_text_file_path}'") | |
| return ScrapingResult( | |
| document = document, | |
| discovered_urls = discovered_urls, | |
| final_url = final_url, | |
| timestamps = timestamps, | |
| discovery_depth = discovery_depth + 1, | |
| status = ScrapingStatus.OK, | |
| ) | |
| def _save_results(self, path: str, filename: str, results, target_url: str | None = None) -> None: | |
| results_path = os.path.join(path, filename + '.json') | |
| results_dict = {} | |
| if os.path.exists(results_path): | |
| try: | |
| with open(results_path, 'r', encoding='utf-8') as f: | |
| results_dict = json.load(f) | |
| except Exception: | |
| logger.warning(f"Failed to load existing {results_path}, will overwrite") | |
| match filename: | |
| case 'url_tags': | |
| results_dict |= results | |
| case 'url_timestamps': | |
| for url, ts in results.items(): | |
| results_dict[url] = dataclass_to_dict(ts) | |
| case 'url_priorities': | |
| for prio, urls in results.items(): | |
| prev = set(results_dict.get(prio, [])) | |
| results_dict[prio] = list(prev.union(urls)) | |
| case _ if filename.endswith('_merged_chunks'): | |
| for url, chunks in results.items(): | |
| results_dict[url] = [dataclass_to_dict(chunk) for chunk in chunks] | |
| case _: | |
| results = [dataclass_to_dict(r) for r in results] | |
| if target_url: | |
| results_dict[target_url] = results | |
| else: | |
| results_dict = results | |
| try: | |
| with open(results_path, 'w', encoding='utf-8') as f: | |
| json.dump( | |
| results_dict, | |
| f, | |
| indent=4, | |
| default=lambda o: o.isoformat() if isinstance(o, datetime) else None, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to store results '{filename}'") | |
| raise e | |
| logger.debug(f"Stored results in file {results_path}") | |
| def _load_data(self, path: str, filename: str): | |
| datapath = os.path.join(path, filename + '.json') | |
| if not os.path.exists(datapath): | |
| logger.warning(f"Failed to locate file {datapath}; new data will be recorded") | |
| return defaultdict(dict) | |
| try: | |
| with open(datapath, 'r', encoding='utf-8') as f: | |
| loaded_data = json.load(f) | |
| match filename: | |
| case 'url_timestamps': | |
| for url, ts_dict in loaded_data.items(): | |
| loaded_data[url] = dict_to_dataclass(ts_dict, UrlTimestamps) | |
| incupd_logger.debug(f"Loaded {len(loaded_data)} URL timestamps") | |
| return loaded_data | |
| case _ if filename.endswith('_merged_chunks'): | |
| for url, chunk_metadata in loaded_data.items(): | |
| loaded_data[url] = [dict_to_dataclass(chunk, ChunkMetadata) for chunk in chunk_metadata] | |
| incupd_logger.debug(f"Loaded {len(loaded_data)} temp merged chunks") | |
| return loaded_data | |
| case _: | |
| incupd_logger.info(f"Loaded data '{filename}'") | |
| return loaded_data | |
| except Exception as e: | |
| logger.error(f"Failed trying to load data '{filename}': {e}") | |
| logger.info("New data will be recorded") | |
| return defaultdict(dict) | |