S-Dreamer commited on
Commit
e848759
·
verified ·
1 Parent(s): 141cb06

Create osint_core/validators.py

Browse files
Files changed (1) hide show
  1. osint_core/validators.py +402 -0
osint_core/validators.py ADDED
@@ -0,0 +1,402 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ osint_core.validators
3
+ =====================
4
+
5
+ Input validation and normalization for the Passive OSINT Control Panel.
6
+
7
+ Design goals:
8
+ - Treat all input as hostile.
9
+ - Normalize before hashing, enrichment, audit, or reporting.
10
+ - Return structured results so downstream modules do not guess intent.
11
+ - Reject ambiguous or dangerous inputs early.
12
+ - Avoid network calls. This module is pure validation/normalization.
13
+
14
+ Supported indicator types:
15
+ - domain
16
+ - username
17
+ - email
18
+ - ip
19
+ - url
20
+ """
21
+
22
+ from __future__ import annotations
23
+
24
+ import html
25
+ import ipaddress
26
+ import re
27
+ from dataclasses import dataclass
28
+ from enum import Enum
29
+ from typing import Literal
30
+ from urllib.parse import urlparse, urlunparse
31
+
32
+
33
+ IndicatorType = Literal["domain", "username", "email", "ip", "url", "unknown"]
34
+
35
+
36
+ class ValidationErrorCode(str, Enum):
37
+ EMPTY_INPUT = "empty_input"
38
+ TOO_LONG = "too_long"
39
+ CONTROL_CHARACTERS = "control_characters"
40
+ INVALID_TYPE = "invalid_type"
41
+ INVALID_DOMAIN = "invalid_domain"
42
+ INVALID_USERNAME = "invalid_username"
43
+ INVALID_EMAIL = "invalid_email"
44
+ INVALID_IP = "invalid_ip"
45
+ INVALID_URL = "invalid_url"
46
+ UNSUPPORTED_INDICATOR = "unsupported_indicator"
47
+ BLOCKED_LOCAL_TARGET = "blocked_local_target"
48
+ BLOCKED_DANGEROUS_PATTERN = "blocked_dangerous_pattern"
49
+
50
+
51
+ @dataclass(frozen=True)
52
+ class ValidationResult:
53
+ ok: bool
54
+ indicator_type: IndicatorType
55
+ normalized: str
56
+ original_length: int
57
+ warnings: list[str]
58
+ error: str | None = None
59
+ error_code: ValidationErrorCode | None = None
60
+
61
+
62
+ MAX_INPUT_LENGTH = 256
63
+ MAX_USERNAME_LENGTH = 64
64
+ MAX_EMAIL_LOCAL_LENGTH = 64
65
+ MAX_EMAIL_LENGTH = 320
66
+ MAX_DOMAIN_LENGTH = 253
67
+ MAX_URL_LENGTH = 2048
68
+
69
+ CONTROL_CHARS_RE = re.compile(r"[\x00-\x1f\x7f]")
70
+ DOMAIN_RE = re.compile(
71
+ r"^(?=.{1,253}$)(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$"
72
+ )
73
+ USERNAME_RE = re.compile(r"^[a-zA-Z0-9_.-]{2,64}$")
74
+ EMAIL_RE = re.compile(r"^[A-Za-z0-9.!#$%&'*+/=?^_`{|}~-]{1,64}@[A-Za-z0-9.-]{1,255}\.[A-Za-z]{2,63}$")
75
+
76
+ DANGEROUS_PATTERNS = [
77
+ re.compile(pattern, re.IGNORECASE)
78
+ for pattern in [
79
+ r"\.\./",
80
+ r"%2e%2e",
81
+ r"<\s*script",
82
+ r"javascript:",
83
+ r"data:",
84
+ r"file:",
85
+ r";",
86
+ r"\|",
87
+ r"&&",
88
+ r"\$\(",
89
+ r"`",
90
+ r"\{.*\}",
91
+ ]
92
+ ]
93
+
94
+ LOCAL_HOSTNAMES = {"localhost", "ip6-localhost", "ip6-loopback"}
95
+ PRIVATE_NETS = [
96
+ ipaddress.ip_network("10.0.0.0/8"),
97
+ ipaddress.ip_network("172.16.0.0/12"),
98
+ ipaddress.ip_network("192.168.0.0/16"),
99
+ ipaddress.ip_network("127.0.0.0/8"),
100
+ ipaddress.ip_network("169.254.0.0/16"),
101
+ ipaddress.ip_network("::1/128"),
102
+ ipaddress.ip_network("fc00::/7"),
103
+ ipaddress.ip_network("fe80::/10"),
104
+ ]
105
+
106
+
107
+ def validate_indicator(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> ValidationResult:
108
+ """
109
+ Validate and normalize a user-supplied OSINT indicator.
110
+
111
+ Parameters
112
+ ----------
113
+ raw_value:
114
+ User input.
115
+ forced_type:
116
+ One of: Auto, Domain, Username, Email, IP, URL.
117
+ allow_private_targets:
118
+ Whether private/local network targets should be accepted.
119
+ This should remain False for public Spaces.
120
+
121
+ Returns
122
+ -------
123
+ ValidationResult
124
+ Structured validation result.
125
+ """
126
+ original_length = len(raw_value) if raw_value is not None else 0
127
+ warnings: list[str] = []
128
+
129
+ try:
130
+ cleaned = sanitize_raw_input(raw_value)
131
+ check_dangerous_patterns(cleaned)
132
+ forced = normalize_forced_type(forced_type)
133
+
134
+ if forced != "auto":
135
+ indicator_type, normalized = validate_as_type(cleaned, forced, allow_private_targets)
136
+ else:
137
+ indicator_type, normalized = classify_auto(cleaned, allow_private_targets)
138
+
139
+ if normalized != cleaned:
140
+ warnings.append("Input was normalized before processing.")
141
+
142
+ return ValidationResult(
143
+ ok=True,
144
+ indicator_type=indicator_type,
145
+ normalized=normalized,
146
+ original_length=original_length,
147
+ warnings=warnings,
148
+ )
149
+
150
+ except ValidationException as exc:
151
+ return ValidationResult(
152
+ ok=False,
153
+ indicator_type="unknown",
154
+ normalized="",
155
+ original_length=original_length,
156
+ warnings=warnings,
157
+ error=str(exc),
158
+ error_code=exc.code,
159
+ )
160
+
161
+
162
+ class ValidationException(ValueError):
163
+ def __init__(self, message: str, code: ValidationErrorCode):
164
+ super().__init__(message)
165
+ self.code = code
166
+
167
+
168
+ def sanitize_raw_input(raw_value: str) -> str:
169
+ if raw_value is None:
170
+ raise ValidationException("Input is required.", ValidationErrorCode.EMPTY_INPUT)
171
+
172
+ value = str(raw_value).strip()
173
+
174
+ if not value:
175
+ raise ValidationException("Input is empty.", ValidationErrorCode.EMPTY_INPUT)
176
+
177
+ if CONTROL_CHARS_RE.search(value):
178
+ raise ValidationException(
179
+ "Input contains control characters.",
180
+ ValidationErrorCode.CONTROL_CHARACTERS,
181
+ )
182
+
183
+ if len(value) > MAX_INPUT_LENGTH:
184
+ raise ValidationException(
185
+ f"Input exceeds {MAX_INPUT_LENGTH} characters.",
186
+ ValidationErrorCode.TOO_LONG,
187
+ )
188
+
189
+ # Escape then unescape to normalize obvious HTML entity tricks without
190
+ # returning an escaped value to downstream validators.
191
+ escaped = html.escape(value, quote=True)
192
+ return html.unescape(escaped).strip()
193
+
194
+
195
+ def check_dangerous_patterns(value: str) -> None:
196
+ for pattern in DANGEROUS_PATTERNS:
197
+ if pattern.search(value):
198
+ raise ValidationException(
199
+ "Input contains a blocked pattern.",
200
+ ValidationErrorCode.BLOCKED_DANGEROUS_PATTERN,
201
+ )
202
+
203
+
204
+ def normalize_forced_type(forced_type: str) -> str:
205
+ value = (forced_type or "Auto").strip().lower()
206
+
207
+ aliases = {
208
+ "auto": "auto",
209
+ "domain": "domain",
210
+ "username": "username",
211
+ "user": "username",
212
+ "email": "email",
213
+ "mail": "email",
214
+ "ip": "ip",
215
+ "ip address": "ip",
216
+ "url": "url",
217
+ "uri": "url",
218
+ }
219
+
220
+ if value not in aliases:
221
+ raise ValidationException(
222
+ f"Unsupported forced type: {forced_type}",
223
+ ValidationErrorCode.INVALID_TYPE,
224
+ )
225
+
226
+ return aliases[value]
227
+
228
+
229
+ def classify_auto(value: str, allow_private_targets: bool) -> tuple[IndicatorType, str]:
230
+ # URL first, because URLs can contain domains/IPs.
231
+ if looks_like_url(value):
232
+ return validate_url(value, allow_private_targets)
233
+
234
+ # IP before domain.
235
+ try:
236
+ return validate_ip(value, allow_private_targets)
237
+ except ValidationException:
238
+ pass
239
+
240
+ if "@" in value:
241
+ return validate_email(value, allow_private_targets)
242
+
243
+ if "." in value:
244
+ return validate_domain(value, allow_private_targets)
245
+
246
+ if USERNAME_RE.fullmatch(value):
247
+ return validate_username(value, allow_private_targets)
248
+
249
+ raise ValidationException(
250
+ "Unsupported or malformed indicator.",
251
+ ValidationErrorCode.UNSUPPORTED_INDICATOR,
252
+ )
253
+
254
+
255
+ def validate_as_type(value: str, forced: str, allow_private_targets: bool) -> tuple[IndicatorType, str]:
256
+ if forced == "domain":
257
+ return validate_domain(value, allow_private_targets)
258
+ if forced == "username":
259
+ return validate_username(value, allow_private_targets)
260
+ if forced == "email":
261
+ return validate_email(value, allow_private_targets)
262
+ if forced == "ip":
263
+ return validate_ip(value, allow_private_targets)
264
+ if forced == "url":
265
+ return validate_url(value, allow_private_targets)
266
+
267
+ raise ValidationException("Unsupported indicator type.", ValidationErrorCode.INVALID_TYPE)
268
+
269
+
270
+ def validate_domain(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
271
+ domain = value.strip().lower().rstrip(".")
272
+
273
+ if len(domain) > MAX_DOMAIN_LENGTH or not DOMAIN_RE.fullmatch(domain):
274
+ raise ValidationException("Invalid domain.", ValidationErrorCode.INVALID_DOMAIN)
275
+
276
+ labels = domain.split(".")
277
+ for label in labels:
278
+ if label.startswith("-") or label.endswith("-"):
279
+ raise ValidationException("Invalid domain label.", ValidationErrorCode.INVALID_DOMAIN)
280
+
281
+ if domain in LOCAL_HOSTNAMES and not allow_private_targets:
282
+ raise ValidationException(
283
+ "Local/private targets are blocked by policy.",
284
+ ValidationErrorCode.BLOCKED_LOCAL_TARGET,
285
+ )
286
+
287
+ return "domain", domain
288
+
289
+
290
+ def validate_username(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
291
+ del allow_private_targets
292
+
293
+ username = value.strip()
294
+
295
+ if len(username) > MAX_USERNAME_LENGTH or not USERNAME_RE.fullmatch(username):
296
+ raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME)
297
+
298
+ if username in {".", ".."}:
299
+ raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME)
300
+
301
+ return "username", username
302
+
303
+
304
+ def validate_email(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
305
+ email = value.strip().lower()
306
+
307
+ if len(email) > MAX_EMAIL_LENGTH or not EMAIL_RE.fullmatch(email):
308
+ raise ValidationException("Invalid email address.", ValidationErrorCode.INVALID_EMAIL)
309
+
310
+ local, domain = email.rsplit("@", 1)
311
+
312
+ if len(local) > MAX_EMAIL_LOCAL_LENGTH:
313
+ raise ValidationException("Invalid email local part.", ValidationErrorCode.INVALID_EMAIL)
314
+
315
+ _, normalized_domain = validate_domain(domain, allow_private_targets)
316
+ return "email", f"{local}@{normalized_domain}"
317
+
318
+
319
+ def validate_ip(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
320
+ try:
321
+ ip = ipaddress.ip_address(value.strip())
322
+ except ValueError as exc:
323
+ raise ValidationException("Invalid IP address.", ValidationErrorCode.INVALID_IP) from exc
324
+
325
+ if not allow_private_targets and is_private_or_local_ip(ip):
326
+ raise ValidationException(
327
+ "Local/private targets are blocked by policy.",
328
+ ValidationErrorCode.BLOCKED_LOCAL_TARGET,
329
+ )
330
+
331
+ return "ip", str(ip)
332
+
333
+
334
+ def validate_url(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
335
+ if len(value) > MAX_URL_LENGTH:
336
+ raise ValidationException("URL is too long.", ValidationErrorCode.TOO_LONG)
337
+
338
+ parsed = urlparse(value.strip())
339
+
340
+ if parsed.scheme.lower() not in {"http", "https"} or not parsed.netloc:
341
+ raise ValidationException(
342
+ "Invalid URL. Only http:// and https:// URLs are supported.",
343
+ ValidationErrorCode.INVALID_URL,
344
+ )
345
+
346
+ hostname = parsed.hostname
347
+ if not hostname:
348
+ raise ValidationException("Invalid URL hostname.", ValidationErrorCode.INVALID_URL)
349
+
350
+ hostname = hostname.lower().rstrip(".")
351
+
352
+ if hostname in LOCAL_HOSTNAMES and not allow_private_targets:
353
+ raise ValidationException(
354
+ "Local/private targets are blocked by policy.",
355
+ ValidationErrorCode.BLOCKED_LOCAL_TARGET,
356
+ )
357
+
358
+ # Validate hostname as IP or domain.
359
+ try:
360
+ _, normalized_host = validate_ip(hostname, allow_private_targets)
361
+ except ValidationException:
362
+ _, normalized_host = validate_domain(hostname, allow_private_targets)
363
+
364
+ # Strip fragments. Fragments are client-side and not useful for passive OSINT hashing.
365
+ normalized = urlunparse(
366
+ (
367
+ parsed.scheme.lower(),
368
+ normalized_host if parsed.port is None else f"{normalized_host}:{parsed.port}",
369
+ parsed.path or "",
370
+ "",
371
+ parsed.query or "",
372
+ "",
373
+ )
374
+ )
375
+
376
+ return "url", normalized
377
+
378
+
379
+ def looks_like_url(value: str) -> bool:
380
+ lowered = value.lower()
381
+ return lowered.startswith("http://") or lowered.startswith("https://")
382
+
383
+
384
+ def is_private_or_local_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
385
+ return (
386
+ ip.is_private
387
+ or ip.is_loopback
388
+ or ip.is_link_local
389
+ or ip.is_multicast
390
+ or ip.is_reserved
391
+ or any(ip in net for net in PRIVATE_NETS)
392
+ )
393
+
394
+
395
+ def assert_valid_or_raise(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> tuple[IndicatorType, str]:
396
+ """
397
+ Convenience helper for callers that prefer exceptions.
398
+ """
399
+ result = validate_indicator(raw_value, forced_type, allow_private_targets)
400
+ if not result.ok:
401
+ raise ValidationException(result.error or "Validation failed.", result.error_code or ValidationErrorCode.UNSUPPORTED_INDICATOR)
402
+ return result.indicator_type, result.normalized