|
|
""" |
|
|
src/utils/rate_limiter.py |
|
|
Domain-Specific Rate Limiter for Concurrent Web Scraping |
|
|
|
|
|
Provides thread-safe rate limiting to prevent anti-bot detection when |
|
|
multiple agents scrape the same domains concurrently. |
|
|
|
|
|
Usage: |
|
|
from src.utils.rate_limiter import RateLimiter, get_rate_limiter |
|
|
|
|
|
# Global singleton |
|
|
limiter = get_rate_limiter() |
|
|
|
|
|
# Acquire before making request |
|
|
with limiter.acquire("twitter"): |
|
|
# Make request to Twitter |
|
|
pass |
|
|
""" |
|
|
|
|
|
import threading |
|
|
import time |
|
|
import logging |
|
|
from typing import Dict, Optional |
|
|
from contextlib import contextmanager |
|
|
from collections import defaultdict |
|
|
|
|
|
logger = logging.getLogger("Roger.rate_limiter") |
|
|
|
|
|
|
|
|
class RateLimiter: |
|
|
""" |
|
|
Thread-safe rate limiter with domain-specific limits. |
|
|
|
|
|
Implements a token bucket algorithm with configurable: |
|
|
- Requests per minute per domain |
|
|
- Maximum concurrent requests per domain |
|
|
- Minimum delay between requests to same domain |
|
|
""" |
|
|
|
|
|
|
|
|
DEFAULT_LIMITS = { |
|
|
"twitter": {"rpm": 15, "max_concurrent": 2, "min_delay": 2.0}, |
|
|
"facebook": {"rpm": 10, "max_concurrent": 2, "min_delay": 3.0}, |
|
|
"linkedin": {"rpm": 10, "max_concurrent": 1, "min_delay": 5.0}, |
|
|
"instagram": {"rpm": 10, "max_concurrent": 2, "min_delay": 3.0}, |
|
|
"reddit": {"rpm": 30, "max_concurrent": 3, "min_delay": 1.0}, |
|
|
"news": {"rpm": 60, "max_concurrent": 5, "min_delay": 0.5}, |
|
|
"government": {"rpm": 30, "max_concurrent": 3, "min_delay": 1.0}, |
|
|
"default": {"rpm": 30, "max_concurrent": 3, "min_delay": 1.0}, |
|
|
} |
|
|
|
|
|
def __init__(self, custom_limits: Optional[Dict] = None): |
|
|
""" |
|
|
Initialize rate limiter with optional custom limits. |
|
|
|
|
|
Args: |
|
|
custom_limits: Optional dict to override default limits per domain |
|
|
""" |
|
|
self._limits = {**self.DEFAULT_LIMITS} |
|
|
if custom_limits: |
|
|
self._limits.update(custom_limits) |
|
|
|
|
|
|
|
|
self._semaphores: Dict[str, threading.Semaphore] = {} |
|
|
|
|
|
|
|
|
self._last_request: Dict[str, float] = defaultdict(float) |
|
|
|
|
|
|
|
|
self._request_counts: Dict[str, list] = defaultdict(list) |
|
|
|
|
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
|
logger.info( |
|
|
f"[RateLimiter] Initialized with {len(self._limits)} domain configurations" |
|
|
) |
|
|
|
|
|
def _get_domain_config(self, domain: str) -> Dict: |
|
|
"""Get configuration for a domain, falling back to default.""" |
|
|
return self._limits.get(domain.lower(), self._limits["default"]) |
|
|
|
|
|
def _get_semaphore(self, domain: str) -> threading.Semaphore: |
|
|
"""Get or create semaphore for a domain.""" |
|
|
domain = domain.lower() |
|
|
with self._lock: |
|
|
if domain not in self._semaphores: |
|
|
config = self._get_domain_config(domain) |
|
|
self._semaphores[domain] = threading.Semaphore(config["max_concurrent"]) |
|
|
return self._semaphores[domain] |
|
|
|
|
|
def _wait_for_rate_limit(self, domain: str) -> None: |
|
|
"""Wait if necessary to respect rate limits.""" |
|
|
domain = domain.lower() |
|
|
config = self._get_domain_config(domain) |
|
|
|
|
|
with self._lock: |
|
|
now = time.time() |
|
|
|
|
|
|
|
|
last = self._last_request[domain] |
|
|
if last > 0: |
|
|
elapsed = now - last |
|
|
min_delay = config["min_delay"] |
|
|
if elapsed < min_delay: |
|
|
wait_time = min_delay - elapsed |
|
|
logger.debug( |
|
|
f"[RateLimiter] {domain}: waiting {wait_time:.2f}s for min_delay" |
|
|
) |
|
|
time.sleep(wait_time) |
|
|
|
|
|
|
|
|
self._request_counts[domain] = [ |
|
|
ts for ts in self._request_counts[domain] if now - ts < 60 |
|
|
] |
|
|
|
|
|
|
|
|
rpm_limit = config["rpm"] |
|
|
if len(self._request_counts[domain]) >= rpm_limit: |
|
|
oldest = self._request_counts[domain][0] |
|
|
wait_time = 60 - (now - oldest) + 0.1 |
|
|
if wait_time > 0: |
|
|
logger.warning( |
|
|
f"[RateLimiter] {domain}: RPM limit ({rpm_limit}) reached, waiting {wait_time:.2f}s" |
|
|
) |
|
|
time.sleep(wait_time) |
|
|
|
|
|
|
|
|
self._last_request[domain] = time.time() |
|
|
self._request_counts[domain].append(time.time()) |
|
|
|
|
|
@contextmanager |
|
|
def acquire(self, domain: str): |
|
|
""" |
|
|
Context manager to acquire rate limit slot for a domain. |
|
|
|
|
|
Usage: |
|
|
with limiter.acquire("twitter"): |
|
|
# Make request |
|
|
pass |
|
|
""" |
|
|
domain = domain.lower() |
|
|
semaphore = self._get_semaphore(domain) |
|
|
|
|
|
logger.debug(f"[RateLimiter] {domain}: acquiring slot...") |
|
|
semaphore.acquire() |
|
|
|
|
|
try: |
|
|
self._wait_for_rate_limit(domain) |
|
|
logger.debug(f"[RateLimiter] {domain}: slot acquired") |
|
|
yield |
|
|
finally: |
|
|
semaphore.release() |
|
|
logger.debug(f"[RateLimiter] {domain}: slot released") |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
"""Get current rate limiter statistics.""" |
|
|
with self._lock: |
|
|
now = time.time() |
|
|
stats = {} |
|
|
for domain in self._request_counts: |
|
|
recent = [ts for ts in self._request_counts[domain] if now - ts < 60] |
|
|
stats[domain] = { |
|
|
"requests_last_minute": len(recent), |
|
|
"last_request_ago": ( |
|
|
now - self._last_request[domain] |
|
|
if self._last_request[domain] |
|
|
else None |
|
|
), |
|
|
} |
|
|
return stats |
|
|
|
|
|
|
|
|
|
|
|
_rate_limiter: Optional[RateLimiter] = None |
|
|
_rate_limiter_lock = threading.Lock() |
|
|
|
|
|
|
|
|
def get_rate_limiter() -> RateLimiter: |
|
|
"""Get the global rate limiter singleton.""" |
|
|
global _rate_limiter |
|
|
|
|
|
with _rate_limiter_lock: |
|
|
if _rate_limiter is None: |
|
|
_rate_limiter = RateLimiter() |
|
|
return _rate_limiter |
|
|
|
|
|
|
|
|
def reset_rate_limiter() -> None: |
|
|
"""Reset the global rate limiter (useful for testing).""" |
|
|
global _rate_limiter |
|
|
|
|
|
with _rate_limiter_lock: |
|
|
_rate_limiter = None |
|
|
|