BibGuard / src /utils /http.py
thinkwee
fix retry storm
f58a6b2
"""
Shared HTTP client: pooled session + auto-retry + optional on-disk cache.
All fetchers should go through `get_session()` instead of bare `requests.get`.
This gives them consistent retry/backoff on 429/5xx, polite-pool User-Agent,
and (when enabled) SQLite-backed response caching to skip re-querying the
same URL on re-runs.
The application-level circuit breaker (``is_open`` / ``record_failure``) is
the primary defense against bad networks: any source that fails twice gets
skipped for the rest of the run. urllib3's own retry is intentionally
narrow (5xx only, no connect/read retries) so the breaker can trip fast.
For deploys where you know certain sources won't work (e.g. HF Spaces
egress IPs are routinely blocked by DBLP and arxiv), set
``BIBGUARD_DISABLE_SOURCES=dblp,arxiv`` to permanently mark those breakers
as open at startup so we never even try them.
"""
from __future__ import annotations
import logging
import os
import threading
from pathlib import Path
from typing import Optional
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
logger = logging.getLogger(__name__)
# Global per-process state
_lock = threading.Lock()
_settings: dict = {
"contact_email": "",
"cache_enabled": True,
"cache_ttl_hours": 24,
"retry_total": 5,
"retry_backoff_factor": 1.5,
"cache_dir": None, # Path or None
}
_session: Optional[requests.Session] = None
def configure(
contact_email: str = "",
cache_enabled: bool = True,
cache_ttl_hours: int = 24,
retry_total: int = 5,
retry_backoff_factor: float = 1.5,
cache_dir: Optional[Path] = None,
) -> None:
"""Configure HTTP layer. Call once at startup before any fetcher is used."""
global _session
with _lock:
_settings.update({
"contact_email": contact_email or "",
"cache_enabled": cache_enabled,
"cache_ttl_hours": int(cache_ttl_hours),
"retry_total": int(retry_total),
"retry_backoff_factor": float(retry_backoff_factor),
"cache_dir": cache_dir,
})
# Force rebuild on next get_session()
_session = None
def user_agent() -> str:
"""Build a polite User-Agent string. Includes contact email if configured."""
email = _settings.get("contact_email") or ""
if email:
return f"BibGuard/1.0 (+https://github.com/thinkwee/BibGuard; mailto:{email})"
return "BibGuard/1.0 (+https://github.com/thinkwee/BibGuard)"
def _build_session() -> requests.Session:
"""Construct a Session with retry and (optionally) caching."""
cache_enabled = _settings["cache_enabled"]
ttl = _settings["cache_ttl_hours"] * 3600
if cache_enabled:
try:
from requests_cache import CachedSession # type: ignore
cache_dir = _settings.get("cache_dir")
if cache_dir is None:
cache_dir = Path.home() / ".cache" / "bibguard"
cache_dir.mkdir(parents=True, exist_ok=True)
session = CachedSession(
cache_name=str(cache_dir / "http_cache"),
backend="sqlite",
expire_after=ttl,
allowable_methods=("GET", "HEAD"),
allowable_codes=(200, 203, 300, 301, 308),
stale_if_error=True,
)
logger.debug("HTTP cache enabled: %s (ttl=%ss)", cache_dir, ttl)
except ImportError:
logger.info(
"requests-cache not installed; running without HTTP cache. "
"Install via `pip install requests-cache` for big speedups on re-runs."
)
session = requests.Session()
else:
session = requests.Session()
# Retry policy is deliberately surgical:
# - 429 NOT in status_forcelist: rate-limit means "back off", not "retry";
# retrying just blocks the thread while a parallel source could answer.
# - connect=0, read=0: do NOT retry on ConnectionReset / ReadTimeout /
# ConnectError. On hostile-network deploys (e.g. HF Spaces' egress IPs
# are sometimes blocked by DBLP / arxiv export), these errors persist
# across retries — retries just multiply the wall-clock penalty
# before our application-level circuit breaker can trip the source.
# - status retries are capped at min(retry_total, 2) for genuine 5xx,
# which are usually transient.
# The application-level circuit breaker (below) is the source-of-truth
# for "stop hitting this host"; urllib3's job is just one fast attempt.
status_retries = min(int(_settings["retry_total"]), 2)
retry = Retry(
total=status_retries,
connect=0,
read=0,
status=status_retries,
backoff_factor=_settings["retry_backoff_factor"],
status_forcelist=(500, 502, 503, 504),
allowed_methods=("GET", "HEAD"),
raise_on_status=False,
respect_retry_after_header=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({"User-Agent": user_agent()})
return session
def get_session() -> requests.Session:
"""Return the shared, configured Session. Thread-safe."""
global _session
if _session is None:
with _lock:
if _session is None:
_session = _build_session()
return _session
def reset_for_tests() -> None:
"""Drop the shared session. Used by tests to force a rebuild."""
global _session
with _lock:
_session = None
# ---------------------------------------------------------------------------
# Circuit breaker: trip a source after N consecutive failures so the rest of
# the run skips it instead of paying its rate-limit/timeout penalty per entry.
# ---------------------------------------------------------------------------
_breakers: dict[str, dict] = {}
_breakers_lock = threading.Lock()
def is_open(source: str) -> bool:
"""True if the source's circuit is currently tripped (skip it)."""
with _breakers_lock:
b = _breakers.get(source)
return bool(b and b.get("open"))
def record_failure(source: str, threshold: int = 2) -> bool:
"""Note a failure for `source`; trip the breaker after `threshold`.
The default of 2 is intentionally aggressive: with urllib3 retries on
connection/read errors disabled (see ``_build_session``), each failure
completes in 1-3 seconds. Two quick fails ≈ 4-6 s wasted before the
source is shut off for the rest of the run, which is far cheaper than
the alternative of paying the timeout-per-entry on bad networks (HF
Spaces' egress IP being blocked by DBLP, e.g.).
Returns True if the breaker is now (or was already) open.
"""
with _breakers_lock:
b = _breakers.setdefault(source, {"failures": 0, "open": False})
b["failures"] += 1
if b["failures"] >= threshold:
if not b["open"]:
logger.warning(
"Circuit breaker tripped for %s after %d failures; "
"skipping for the rest of this run.",
source, b["failures"],
)
b["open"] = True
return b["open"]
def record_success(source: str) -> None:
"""Reset the failure counter on a success."""
with _breakers_lock:
b = _breakers.get(source)
if b:
b["failures"] = 0
b["open"] = False
def reset_breakers() -> None:
"""Clear all breaker state (called at the start of a fresh run).
After clearing, sources listed in ``BIBGUARD_DISABLE_SOURCES`` (comma- or
space-separated, case-insensitive) are immediately re-marked as open so
the run never even attempts them. Useful on hostile-network deploys.
"""
with _breakers_lock:
_breakers.clear()
disabled = os.environ.get("BIBGUARD_DISABLE_SOURCES", "")
for raw in disabled.replace(",", " ").split():
name = raw.strip().lower()
if not name:
continue
with _breakers_lock:
_breakers[name] = {"failures": 9999, "open": True, "disabled": True}
logger.info("Source %r pre-disabled via BIBGUARD_DISABLE_SOURCES", name)