MB-IDK commited on
Commit
81e15e9
·
verified ·
1 Parent(s): b288f9f

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +172 -0
app.py ADDED
@@ -0,0 +1,172 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ HaveIBeenPwned API - Hugging Face Spaces
4
+ """
5
+
6
+ import urllib.parse
7
+ from datetime import datetime
8
+
9
+ import cloudscraper
10
+ from fastapi import FastAPI, HTTPException
11
+ from fastapi.middleware.cors import CORSMiddleware
12
+
13
+ # =============================================================================
14
+ # CONFIG
15
+ # =============================================================================
16
+
17
+ HIBP_BASE_URL = "https://haveibeenpwned.com"
18
+
19
+ DEFAULT_HEADERS = {
20
+ "Accept": "*/*",
21
+ "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
22
+ "DNT": "1",
23
+ "Referer": "https://haveibeenpwned.com/",
24
+ "Sec-Fetch-Dest": "empty",
25
+ "Sec-Fetch-Mode": "cors",
26
+ "Sec-Fetch-Site": "same-origin",
27
+ "Sec-GPC": "1",
28
+ "User-Agent": (
29
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
30
+ "Gecko/20100101 Firefox/148.0"
31
+ ),
32
+ }
33
+
34
+ # =============================================================================
35
+ # CLIENT
36
+ # =============================================================================
37
+
38
+ class HIBPClient:
39
+ def __init__(self):
40
+ self.scraper = cloudscraper.create_scraper(
41
+ browser={"browser": "firefox", "platform": "windows", "desktop": True},
42
+ delay=5,
43
+ )
44
+ self.scraper.headers.update(DEFAULT_HEADERS)
45
+
46
+ def search_email(self, email: str) -> dict:
47
+ encoded = urllib.parse.quote(email, safe="")
48
+ url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded}"
49
+
50
+ result = {
51
+ "email": email,
52
+ "timestamp": datetime.utcnow().isoformat() + "Z",
53
+ "status_code": None,
54
+ "data": None,
55
+ "error": None,
56
+ }
57
+
58
+ try:
59
+ resp = self.scraper.get(url, timeout=30)
60
+ result["status_code"] = resp.status_code
61
+
62
+ if resp.status_code == 200:
63
+ result["data"] = resp.json()
64
+ elif resp.status_code == 404:
65
+ result["data"] = {"Breaches": None, "Pastes": None}
66
+ result["error"] = "Email not found in any known breach."
67
+ elif resp.status_code == 429:
68
+ retry = resp.headers.get("Retry-After", "unknown")
69
+ result["error"] = f"Rate limited. Retry after {retry}s."
70
+ elif resp.status_code == 403:
71
+ result["error"] = "Blocked by Cloudflare."
72
+ else:
73
+ result["error"] = f"Unexpected status: {resp.status_code}"
74
+
75
+ except cloudscraper.exceptions.CloudflareChallengeError as e:
76
+ result["error"] = f"Cloudflare challenge failed: {e}"
77
+ result["status_code"] = 503
78
+ except Exception as e:
79
+ result["error"] = f"Request failed: {e}"
80
+ result["status_code"] = 500
81
+
82
+ return result
83
+
84
+ def parse_breaches(self, data: dict) -> list:
85
+ return [
86
+ {
87
+ "name": b.get("Name"),
88
+ "title": b.get("Title"),
89
+ "domain": b.get("Domain"),
90
+ "breach_date": b.get("BreachDate"),
91
+ "pwn_count": b.get("PwnCount"),
92
+ "data_classes": b.get("DataClasses", []),
93
+ "is_verified": b.get("IsVerified"),
94
+ "is_stealer_log": b.get("IsStealerLog"),
95
+ "logo": b.get("LogoPath"),
96
+ }
97
+ for b in (data.get("Breaches") or [])
98
+ ]
99
+
100
+
101
+ # =============================================================================
102
+ # FASTAPI APP
103
+ # =============================================================================
104
+
105
+ app = FastAPI(
106
+ title="HaveIBeenPwned Proxy API",
107
+ description="HIBP search with Cloudflare bypass via cloudscraper",
108
+ version="1.0.0",
109
+ )
110
+
111
+ app.add_middleware(
112
+ CORSMiddleware,
113
+ allow_origins=["*"],
114
+ allow_methods=["*"],
115
+ allow_headers=["*"],
116
+ )
117
+
118
+ client = HIBPClient()
119
+
120
+
121
+ @app.get("/")
122
+ def index():
123
+ return {
124
+ "service": "HaveIBeenPwned Proxy API",
125
+ "version": "1.0.0",
126
+ "endpoints": {
127
+ "GET /": "This page",
128
+ "GET /search/{email}": "Raw HIBP search",
129
+ "GET /breaches/{email}": "Parsed breach summaries",
130
+ "GET /health": "Health check",
131
+ },
132
+ }
133
+
134
+
135
+ @app.get("/health")
136
+ def health():
137
+ return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
138
+
139
+
140
+ @app.get("/search/{email:path}")
141
+ def search(email: str):
142
+ if "@" not in email:
143
+ raise HTTPException(status_code=400, detail="Invalid email")
144
+
145
+ result = client.search_email(email)
146
+ return result
147
+
148
+
149
+ @app.get("/breaches/{email:path}")
150
+ def breaches(email: str):
151
+ if "@" not in email:
152
+ raise HTTPException(status_code=400, detail="Invalid email")
153
+
154
+ result = client.search_email(email)
155
+
156
+ if result["data"]:
157
+ parsed = client.parse_breaches(result["data"])
158
+ pastes = result["data"].get("Pastes") or []
159
+ return {
160
+ "email": email,
161
+ "timestamp": result["timestamp"],
162
+ "total_breaches": len(parsed),
163
+ "total_pastes": len(pastes),
164
+ "breaches": parsed,
165
+ "pastes": pastes,
166
+ "is_pwned": len(parsed) > 0 or len(pastes) > 0,
167
+ }
168
+
169
+ raise HTTPException(
170
+ status_code=result["status_code"] or 500,
171
+ detail=result["error"],
172
+ )