Spaces:
Paused
Paused
| """ | |
| RandomWeb β Polite Async HTTP Validator | |
| Validates discovered URLs with rate limiting, robots.txt compliance, | |
| clear user-agent identification, and timeout rules. | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from urllib.parse import urlparse | |
| import aiohttp | |
| from aiolimiter import AsyncLimiter | |
| from protego import Protego | |
| from backend.config import ( | |
| USER_AGENT, | |
| REQUEST_TIMEOUT, | |
| VALIDATION_CONCURRENCY, | |
| PER_DOMAIN_RATE_LIMIT, | |
| CRAWL_DELAY_DEFAULT, | |
| RECHECK_INTERVAL_DAYS, | |
| ) | |
| from backend.db import get_client, extract_domain | |
| logger = logging.getLogger("randomweb.validator") | |
| # βββ Shared State ββββββββββββββββββββββββββββββββββββββββββββ | |
| _validation_queue: asyncio.Queue = asyncio.Queue(maxsize=50_000) | |
| _robots_cache: dict[str, Optional[Protego]] = {} | |
| _domain_limiters: dict[str, AsyncLimiter] = {} | |
| _semaphore: Optional[asyncio.Semaphore] = None | |
| def get_validation_queue() -> asyncio.Queue: | |
| return _validation_queue | |
| async def enqueue_url(url: str, source: str = "unknown"): | |
| """Add a URL to the validation queue.""" | |
| try: | |
| _validation_queue.put_nowait({"url": url, "source": source}) | |
| except asyncio.QueueFull: | |
| logger.warning("Validation queue full, dropping: %s", url) | |
| def _get_domain_limiter(domain: str) -> AsyncLimiter: | |
| """Get or create a per-domain rate limiter.""" | |
| if domain not in _domain_limiters: | |
| _domain_limiters[domain] = AsyncLimiter( | |
| PER_DOMAIN_RATE_LIMIT, 1.0 | |
| ) | |
| return _domain_limiters[domain] | |
| async def _fetch_robots_txt( | |
| session: aiohttp.ClientSession, domain: str | |
| ) -> Optional[Protego]: | |
| """Fetch and parse robots.txt for a domain. Cached.""" | |
| if domain in _robots_cache: | |
| return _robots_cache[domain] | |
| robots_url = f"https://{domain}/robots.txt" | |
| try: | |
| async with session.get( | |
| robots_url, | |
| timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), | |
| headers={"User-Agent": USER_AGENT}, | |
| allow_redirects=True, | |
| ssl=False, | |
| ) as resp: | |
| if resp.status == 200: | |
| text = await resp.text() | |
| parser = Protego.parse(text) | |
| _robots_cache[domain] = parser | |
| return parser | |
| except Exception: | |
| pass | |
| _robots_cache[domain] = None | |
| return None | |
| async def _can_fetch( | |
| session: aiohttp.ClientSession, url: str | |
| ) -> tuple[bool, float]: | |
| """ | |
| Check if we're allowed to fetch a URL per robots.txt. | |
| Returns (allowed, crawl_delay). | |
| """ | |
| domain = extract_domain(url) | |
| robots = await _fetch_robots_txt(session, domain) | |
| if robots is None: | |
| return True, CRAWL_DELAY_DEFAULT | |
| allowed = robots.can_fetch(url, USER_AGENT) | |
| delay = robots.crawl_delay(USER_AGENT) | |
| if delay is None: | |
| delay = CRAWL_DELAY_DEFAULT | |
| return allowed, delay | |
| async def validate_url( | |
| session: aiohttp.ClientSession, | |
| url: str, | |
| source: str = "unknown", | |
| ) -> Optional[dict]: | |
| """ | |
| Validate a single URL. Returns a record dict if successful, else None. | |
| Steps: | |
| 1. Check robots.txt | |
| 2. Send HEAD request (fallback to GET) | |
| 3. Return result with status | |
| """ | |
| domain = extract_domain(url) | |
| limiter = _get_domain_limiter(domain) | |
| # Rate limit per domain | |
| async with limiter: | |
| # Check robots.txt | |
| allowed, delay = await _can_fetch(session, url) | |
| if not allowed: | |
| logger.debug("Blocked by robots.txt: %s", url) | |
| return None | |
| # Respect crawl delay | |
| if delay > 0: | |
| await asyncio.sleep(delay) | |
| now = datetime.now(timezone.utc).isoformat() | |
| status_code = None | |
| try: | |
| # Try HEAD first (lighter) | |
| async with session.head( | |
| url, | |
| timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), | |
| headers={"User-Agent": USER_AGENT}, | |
| allow_redirects=True, | |
| ssl=False, | |
| ) as resp: | |
| status_code = resp.status | |
| except Exception: | |
| try: | |
| # Fallback to GET | |
| async with session.get( | |
| url, | |
| timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), | |
| headers={"User-Agent": USER_AGENT}, | |
| allow_redirects=True, | |
| ssl=False, | |
| ) as resp: | |
| status_code = resp.status | |
| except Exception as e: | |
| logger.debug("Validation failed for %s: %s", url, e) | |
| status_code = None | |
| is_active = status_code == 200 | |
| next_check = ( | |
| (datetime.now(timezone.utc) + timedelta(days=RECHECK_INTERVAL_DAYS)).isoformat() | |
| if is_active | |
| else None | |
| ) | |
| record = { | |
| "url": url, | |
| "domain": domain, | |
| "source": source, | |
| "status": status_code, | |
| "is_active": is_active, | |
| "last_checked": now, | |
| "next_check": next_check, | |
| } | |
| return record | |
| async def _process_batch( | |
| session: aiohttp.ClientSession, | |
| batch: list[dict], | |
| ) -> list[dict]: | |
| """Validate a batch of URLs concurrently.""" | |
| tasks = [ | |
| validate_url(session, item["url"], item.get("source", "unknown")) | |
| for item in batch | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| records = [] | |
| for result in results: | |
| if isinstance(result, dict) and result is not None: | |
| records.append(result) | |
| elif isinstance(result, Exception): | |
| logger.error("Validation task error: %s", result) | |
| return records | |
| async def run_validator(): | |
| """ | |
| Main validation loop. Continuously drains the validation queue, | |
| validates URLs in batches, and upserts results to Supabase. | |
| """ | |
| global _semaphore | |
| _semaphore = asyncio.Semaphore(VALIDATION_CONCURRENCY) | |
| logger.info("Validation worker started") | |
| connector = aiohttp.TCPConnector( | |
| limit=VALIDATION_CONCURRENCY, | |
| ttl_dns_cache=300, | |
| force_close=False, | |
| ) | |
| async with aiohttp.ClientSession(connector=connector) as session: | |
| while True: | |
| try: | |
| # Collect a batch | |
| batch = [] | |
| try: | |
| # Wait for at least one item | |
| item = await asyncio.wait_for( | |
| _validation_queue.get(), timeout=5.0 | |
| ) | |
| batch.append(item) | |
| except asyncio.TimeoutError: | |
| await asyncio.sleep(1) | |
| continue | |
| # Drain up to batch size | |
| while len(batch) < 50 and not _validation_queue.empty(): | |
| try: | |
| batch.append(_validation_queue.get_nowait()) | |
| except asyncio.QueueEmpty: | |
| break | |
| if batch: | |
| logger.info("Validating batch of %d URLs", len(batch)) | |
| records = await _process_batch(session, batch) | |
| if records: | |
| # Bulk upsert to Supabase | |
| try: | |
| get_client().table("websites").upsert( | |
| records, on_conflict="url" | |
| ).execute() | |
| active = sum(1 for r in records if r["is_active"]) | |
| logger.info( | |
| "Upserted %d records (%d active)", | |
| len(records), active, | |
| ) | |
| except Exception as e: | |
| logger.error("Bulk upsert failed: %s", e) | |
| except Exception as e: | |
| logger.error("Validator loop error: %s", e) | |
| await asyncio.sleep(5) | |