HIBP2 / app.py
MB-IDK's picture
Upload 4 files
8c40d9f verified
"""
╔══════════════════════════════════════════════════════════════════╗
║ HIBP PRO MONITOR — GOD TIER EDITION ║
║ Camoufox + Async + Smart Proxy Pool + Stealth Maximum ║
╚══════════════════════════════════════════════════════════════════╝
"""
from fastapi import FastAPI, Body, HTTPException, Header, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import gradio as gr
import asyncio
import random
import time
import hmac
import hashlib
import logging
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
import aiohttp
from camoufox.async_api import AsyncCamoufox
import uvicorn
import nest_asyncio
nest_asyncio.apply()
# ================================================================
# LOGGING — Clean & structured
# ================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s │ %(levelname)-7s │ %(message)s",
datefmt="%H:%M:%S"
)
log = logging.getLogger("hibp")
# ================================================================
# CONFIGURATION
# ================================================================
class Config:
# --- API ---
API_KEY: str = "CHANGE_ME_TO_A_STRONG_SECRET_KEY"
HOST: str = "0.0.0.0"
PORT: int = 7860
# --- Proxy ---
PROXY_API_BASE: str = "https://voxxium-proxpy.hf.space"
PROXY_FETCH_LIMIT: int = 80
PROXY_TEST_TIMEOUT: float = 6.0
PROXY_CACHE_TTL: int = 180 # seconds avant refresh du pool
PROXY_MAX_CONCURRENT_TESTS: int = 50
# --- Scraper ---
MAX_RETRIES: int = 8 # tentatives par email
MAX_CONCURRENT_EMAILS: int = 3 # emails scrappés en parallèle
PAGE_TIMEOUT: int = 35_000 # ms
NAVIGATION_TIMEOUT: int = 45_000 # ms
# --- Rate Limiting ---
RATE_LIMIT_WINDOW: int = 60 # seconds
RATE_LIMIT_MAX: int = 30 # max requests par fenêtre
# ================================================================
# RATE LIMITER — In-memory sliding window
# ================================================================
class RateLimiter:
def __init__(self, window: int, max_requests: int):
self.window = window
self.max_requests = max_requests
self._requests: Dict[str, List[float]] = {}
def is_allowed(self, key: str) -> bool:
now = time.time()
if key not in self._requests:
self._requests[key] = []
# Nettoyage des vieilles entrées
self._requests[key] = [
t for t in self._requests[key] if t > now - self.window
]
if len(self._requests[key]) >= self.max_requests:
return False
self._requests[key].append(now)
return True
# ================================================================
# PROXY POOL — Async, cached, sorted by latency
# ================================================================
@dataclass
class ProxyPool:
_proxies: List[Dict[str, Any]] = field(default_factory=list)
_last_refresh: float = 0.0
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
@property
def is_stale(self) -> bool:
return time.time() - self._last_refresh > Config.PROXY_CACHE_TTL
@property
def urls(self) -> List[str]:
return [p["url"] for p in self._proxies]
async def _test_single(
self, session: aiohttp.ClientSession, proxy_url: str
) -> Optional[Dict]:
"""Test un proxy via aiohttp — non bloquant."""
try:
start = time.monotonic()
async with session.get(
"https://api.ipify.org?format=json",
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=Config.PROXY_TEST_TIMEOUT),
ssl=False,
) as resp:
if resp.status == 200:
latency = round(time.monotonic() - start, 3)
return {"url": proxy_url, "latency": latency}
except Exception:
return None
async def refresh(self) -> List[str]:
"""Fetch + test tous les proxies en async parallel."""
async with self._lock:
if not self.is_stale and self._proxies:
return self.urls
log.info("🔄 Proxy pool refresh started...")
try:
async with aiohttp.ClientSession() as session:
# 1) Fetch la liste
async with session.get(
f"{Config.PROXY_API_BASE}/all",
params={
"protocol": "http",
"verified": "true",
"limit": Config.PROXY_FETCH_LIMIT,
},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
data = await resp.json()
items = data if isinstance(data, list) else data.get("proxies", [])
raw = []
for p in items:
url = p.get("proxy_url") or p.get("proxy") or p.get("url")
if url:
raw.append(url)
log.info(f" Fetched {len(raw)} raw proxies, testing...")
# 2) Test en parallèle avec semaphore
sem = asyncio.Semaphore(Config.PROXY_MAX_CONCURRENT_TESTS)
async def _bounded_test(proxy_url: str):
async with sem:
return await self._test_single(session, proxy_url)
results = await asyncio.gather(
*[_bounded_test(p) for p in raw],
return_exceptions=True,
)
working = [r for r in results if isinstance(r, dict) and r is not None]
working.sort(key=lambda x: x["latency"])
self._proxies = working
self._last_refresh = time.time()
log.info(f" ✅ {len(working)} working proxies (best: {working[0]['latency']}s)" if working else " ⚠️ No working proxies found")
return self.urls
except Exception as e:
log.error(f" ❌ Proxy refresh failed: {e}")
return self.urls # retourne le cache existant
# ================================================================
# HUMAN BEHAVIOR SIMULATION
# ================================================================
async def human_type(page, selector: str, text: str):
"""Frappe au clavier comme un humain réel — timing variable, erreurs possibles."""
locator = page.locator(selector).first
await locator.wait_for(state="visible", timeout=15_000)
# Click avec petit offset aléatoire
box = await locator.bounding_box()
if box:
x = box["x"] + box["width"] * random.uniform(0.2, 0.8)
y = box["y"] + box["height"] * random.uniform(0.3, 0.7)
await page.mouse.click(x, y)
else:
await locator.click()
await asyncio.sleep(random.uniform(0.3, 0.8))
for i, char in enumerate(text):
# Variation de vitesse : plus lent au début, accélère au milieu
base_delay = random.uniform(45, 140)
# Micro-pauses aléatoires (comme un humain qui réfléchit)
if random.random() < 0.08:
await asyncio.sleep(random.uniform(0.2, 0.6))
await locator.type(char, delay=base_delay)
async def human_delay(min_s: float = 1.0, max_s: float = 3.0):
"""Pause humanisée."""
await asyncio.sleep(random.uniform(min_s, max_s))
# ================================================================
# BREACH EXTRACTOR
# ================================================================
async def extract_breaches(page) -> List[Dict]:
"""Extraction robuste des brèches depuis la timeline HIBP."""
breaches = []
try:
log.info(" 📋 Waiting for breach timeline...")
await page.wait_for_selector(".timeline-item", timeout=20_000)
await asyncio.sleep(1.5) # Animation render
items = await page.locator(".timeline-item").all()
for item in items:
try:
# Extraction parallèle des champs
name_p = item.locator(".timeline-title h5").inner_text()
date_p = item.locator(".timeline-date-text").all_inner_texts()
desc_p = item.locator(".timeline-content p").first.inner_text(timeout=5_000)
comp_p = item.locator(".timeline-details-list li").all_inner_texts()
name, date_texts, desc, comp = await asyncio.gather(
name_p, date_p, desc_p, comp_p,
return_exceptions=True
)
breaches.append({
"name": name.strip() if isinstance(name, str) else "Unknown",
"date": " ".join(date_texts).strip() if isinstance(date_texts, list) else "",
"description": desc.strip() if isinstance(desc, str) else "",
"compromised": [
x.strip() for x in (comp if isinstance(comp, list) else []) if x.strip()
],
})
except Exception as e:
log.debug(f" ⚠️ Item extraction error: {e}")
continue
log.info(f" ✅ {len(breaches)} breaches extracted")
except Exception as e:
log.warning(f" ❌ Timeline extraction failed: {e}")
return breaches
# ================================================================
# CORE SCRAPER ENGINE — Camoufox powered
# ================================================================
async def check_single_email(
email: str,
proxy_pool: ProxyPool,
use_proxy: bool = True,
) -> Dict:
"""
Vérifie un email sur HIBP avec Camoufox.
Retry automatique avec rotation de proxy.
"""
log.info(f"🔍 Checking: {email}")
proxies = []
if use_proxy:
proxy_urls = await proxy_pool.refresh()
proxies = random.sample(proxy_urls, min(Config.MAX_RETRIES - 1, len(proxy_urls)))
# Toujours terminer par une tentative directe (sans proxy)
attempts = proxies + [None]
for attempt_num, proxy_url in enumerate(attempts, 1):
proxy_label = proxy_url or "DIRECT"
log.info(f" [{attempt_num}/{len(attempts)}] via {proxy_label}")
browser = None
try:
# ═══════════════════════════════════════
# CAMOUFOX LAUNCH — Anti-detect maximum
# ═══════════════════════════════════════
launch_kwargs = {
"headless": True,
"humanize": True, # Mouse movement naturel intégré
"block_images": True, # Économise bande passante + vitesse
"block_webrtc": True, # Empêche les leaks d'IP
"os": ["windows", "macos"], # Fingerprint réaliste (pas linux = rare en vrai trafic)
"i_know_what_im_doing": True,
}
if proxy_url:
launch_kwargs["proxy"] = {"server": proxy_url}
async with AsyncCamoufox(**launch_kwargs) as browser:
page = await browser.new_page()
# Timeouts
page.set_default_timeout(Config.PAGE_TIMEOUT)
page.set_default_navigation_timeout(Config.NAVIGATION_TIMEOUT)
# ── Block unnecessary resources pour la SPEED ──
await page.route(
"**/*",
lambda route: (
route.abort()
if route.request.resource_type in ("image", "media", "font", "stylesheet")
else route.continue_()
),
)
# ── Navigation ──
await page.goto(
"https://haveibeenpwned.com/",
wait_until="domcontentloaded",
)
await human_delay(2.0, 4.5)
# ── Saisie email humaine ──
await human_type(page, 'input[type="email"], #emailInput', email)
await human_delay(0.5, 1.5)
# ── Click sur le bouton ──
btn = page.locator('#checkButton, button[type="submit"]').first
await btn.click()
# ── Attente du résultat ──
await page.wait_for_selector(
'#email-result-good:not(.d-none), #email-result-bad:not(.d-none)',
timeout=40_000,
)
is_safe = await page.locator('#email-result-good:not(.d-none)').count() > 0
if is_safe:
log.info(f" ✅ SAFE — {email}")
return {
"email": email,
"pwned": False,
"breach_count": 0,
"breaches": [],
"checked_at": datetime.utcnow().isoformat(),
}
else:
breaches = await extract_breaches(page)
log.info(f" 🔴 PWNED — {email} ({len(breaches)} breaches)")
return {
"email": email,
"pwned": True,
"breach_count": len(breaches),
"breaches": breaches,
"checked_at": datetime.utcnow().isoformat(),
}
except Exception as e:
log.warning(f" ⚠️ Attempt {attempt_num} failed: {str(e)[:120]}")
continue
# Toutes les tentatives échouées
log.error(f" 💀 ALL ATTEMPTS FAILED for {email}")
return {
"email": email,
"pwned": None,
"error": "All attempts failed",
"checked_at": datetime.utcnow().isoformat(),
}
# ================================================================
# APPLICATION SETUP
# ================================================================
proxy_pool = ProxyPool()
rate_limiter = RateLimiter(Config.RATE_LIMIT_WINDOW, Config.RATE_LIMIT_MAX)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Pre-warm proxy pool on startup."""
log.info("🚀 HIBP Pro Monitor — GOD TIER EDITION starting...")
await proxy_pool.refresh()
yield
log.info("👋 Shutting down...")
app = FastAPI(
title="HIBP Pro Monitor — GOD TIER",
version="2.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["POST", "GET"],
allow_headers=["*"],
)
# ================================================================
# AUTH — Timing-safe token comparison
# ================================================================
def verify_token(token: Optional[str]) -> bool:
if not token:
return False
return hmac.compare_digest(token, Config.API_KEY)
# ================================================================
# API ENDPOINTS
# ================================================================
@app.post("/check")
async def api_check(
request: Request,
payload: dict = Body(...),
x_token: str = Header(None),
):
# ── Auth ──
if not verify_token(x_token):
log.warning(f"🚫 Auth failed from {request.client.host}")
raise HTTPException(status_code=403, detail="Invalid or missing token")
# ── Rate Limiting ──
client_ip = request.client.host
if not rate_limiter.is_allowed(client_ip):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# ── Validation ──
emails = payload.get("emails", [])
use_proxy = payload.get("use_proxy", True)
if not emails:
raise HTTPException(status_code=400, detail="No emails provided")
if len(emails) > 20:
raise HTTPException(status_code=400, detail="Max 20 emails per request")
# ── Traitement avec semaphore pour parallélisme contrôlé ──
sem = asyncio.Semaphore(Config.MAX_CONCURRENT_EMAILS)
async def _bounded_check(email: str):
async with sem:
return await check_single_email(email, proxy_pool, use_proxy)
results = await asyncio.gather(
*[_bounded_check(e.strip()) for e in emails if e.strip()],
)
# Vérification des erreurs critiques
failed = [r for r in results if r.get("pwned") is None]
return JSONResponse(
content={
"results": results,
"total": len(results),
"failed": len(failed),
"timestamp": datetime.utcnow().isoformat(),
},
status_code=200 if not failed else 207, # 207 = Multi-Status
)
@app.get("/health")
async def health():
return {
"status": "operational",
"proxy_pool_size": len(proxy_pool._proxies),
"proxy_pool_age": round(time.time() - proxy_pool._last_refresh, 1),
"uptime": "ok",
}
@app.post("/proxies/refresh")
async def force_proxy_refresh(x_token: str = Header(None)):
if not verify_token(x_token):
raise HTTPException(status_code=403, detail="Unauthorized")
proxy_pool._last_refresh = 0 # Force stale
urls = await proxy_pool.refresh()
return {"refreshed": len(urls)}
# ================================================================
# GRADIO UI
# ================================================================
async def gradio_check(txt: str, use_proxies: bool) -> str:
if not txt.strip():
return "⚠️ Enter at least one email."
emails = [e.strip() for e in txt.splitlines() if e.strip()]
lines = []
for email in emails[:10]: # Limite UI
result = await check_single_email(email, proxy_pool, use_proxies)
if result.get("pwned") is None:
lines.append(f"⚫ {email} — ERROR: {result.get('error')}")
elif result["pwned"]:
breach_names = ", ".join(b["name"] for b in result["breaches"][:5])
lines.append(
f"🔴 {email} — PWNED ({result['breach_count']} breaches: {breach_names})"
)
else:
lines.append(f"🟢 {email} — SAFE ✅")
return "\n".join(lines)
with gr.Blocks(title="HIBP Pro Monitor", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🛡️ HIBP Pro Monitor — GOD TIER
**Camoufox-powered** breach detection with stealth anti-fingerprinting.
"""
)
with gr.Row():
with gr.Column(scale=2):
emails_input = gr.Textbox(
lines=6,
label="📧 Emails (one per line)",
placeholder="john@example.com\njane@test.com",
)
proxy_toggle = gr.Checkbox(label="🌐 Use Proxy Rotation", value=True)
check_btn = gr.Button("🔍 Check Breaches", variant="primary", size="lg")
with gr.Column(scale=3):
output = gr.Textbox(
label="📊 Results",
lines=10,
interactive=False,
)
check_btn.click(gradio_check, [emails_input, proxy_toggle], output)
app = gr.mount_gradio_app(app, demo, path="/")
# ================================================================
# ENTRYPOINT
# ================================================================
if __name__ == "__main__":
uvicorn.run(
app,
host=Config.HOST,
port=Config.PORT,
log_level="info",
access_log=False, # On a notre propre logging
)