Spaces:
Sleeping
Sleeping
| """ | |
| Input Validation Module | |
| Comprehensive validation for URLs, inputs, and parameters. | |
| Protects against malicious input and ensures data integrity. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import logging | |
| from typing import Optional, Any | |
| from urllib.parse import urlparse, parse_qs, unquote | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================= | |
| # CONSTANTS | |
| # ============================================================================= | |
| # Maximum lengths | |
| MAX_URL_LENGTH = 2048 | |
| MAX_QUERY_LENGTH = 500 | |
| MAX_TAG_LENGTH = 100 | |
| MAX_BATCH_SIZE = 20 | |
| # Valid URL schemes | |
| VALID_SCHEMES = frozenset({"http", "https"}) | |
| # Known Medium domains | |
| MEDIUM_DOMAINS = frozenset({ | |
| "medium.com", | |
| "towardsdatascience.com", | |
| "betterprogramming.pub", | |
| "levelup.gitconnected.com", | |
| "javascript.plainenglish.io", | |
| "python.plainenglish.io", | |
| "blog.devgenius.io", | |
| "uxdesign.cc", | |
| "itnext.io", | |
| "hackernoon.com", | |
| "freecodecamp.org", | |
| }) | |
| # Dangerous URL patterns | |
| DANGEROUS_PATTERNS = ( | |
| re.compile(r"javascript:", re.IGNORECASE), | |
| re.compile(r"data:", re.IGNORECASE), | |
| re.compile(r"vbscript:", re.IGNORECASE), | |
| re.compile(r"file://", re.IGNORECASE), | |
| ) | |
| # ============================================================================= | |
| # VALIDATION RESULTS | |
| # ============================================================================= | |
| class ValidationResult: | |
| """Result of a validation check.""" | |
| is_valid: bool | |
| value: Optional[str] = None | |
| error: Optional[str] = None | |
| sanitized: bool = False | |
| # ============================================================================= | |
| # URL VALIDATION | |
| # ============================================================================= | |
| def validate_url(url: str) -> ValidationResult: | |
| """ | |
| Validate and sanitize a URL. | |
| Checks: | |
| - Not empty | |
| - Within length limits | |
| - Valid scheme (http/https) | |
| - No dangerous patterns | |
| - Valid URL structure | |
| Args: | |
| url: URL to validate | |
| Returns: | |
| ValidationResult with sanitized URL or error | |
| """ | |
| if not url: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="URL is required" | |
| ) | |
| # Strip whitespace | |
| url = url.strip() | |
| # Check length | |
| if len(url) > MAX_URL_LENGTH: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"URL exceeds maximum length of {MAX_URL_LENGTH}" | |
| ) | |
| # Check for dangerous patterns | |
| for pattern in DANGEROUS_PATTERNS: | |
| if pattern.search(url): | |
| logger.warning(f"Dangerous URL pattern detected: {url[:50]}") | |
| return ValidationResult( | |
| is_valid=False, | |
| error="URL contains dangerous content" | |
| ) | |
| # Parse URL | |
| try: | |
| parsed = urlparse(url) | |
| except Exception as e: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"Invalid URL format: {e}" | |
| ) | |
| # Check scheme | |
| if parsed.scheme.lower() not in VALID_SCHEMES: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="URL must use http or https" | |
| ) | |
| # Check netloc (domain) | |
| if not parsed.netloc: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="URL must include a domain" | |
| ) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=url, | |
| sanitized=True | |
| ) | |
| def validate_medium_url(url: str) -> ValidationResult: | |
| """ | |
| Validate that URL is a Medium article URL. | |
| Args: | |
| url: URL to validate | |
| Returns: | |
| ValidationResult | |
| """ | |
| # First do basic URL validation | |
| result = validate_url(url) | |
| if not result.is_valid: | |
| return result | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower().replace("www.", "") | |
| # Check if it's a known Medium domain | |
| is_medium = any( | |
| domain == md or domain.endswith(f".{md}") | |
| for md in MEDIUM_DOMAINS | |
| ) | |
| if not is_medium: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="URL is not a recognized Medium domain" | |
| ) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=url, | |
| sanitized=True | |
| ) | |
| def validate_batch_urls(urls: list[str]) -> tuple[list[str], list[dict[str, str]]]: | |
| """ | |
| Validate a batch of URLs. | |
| Args: | |
| urls: List of URLs to validate | |
| Returns: | |
| Tuple of (valid_urls, errors) | |
| """ | |
| if not urls: | |
| return [], [{"url": "", "error": "No URLs provided"}] | |
| if len(urls) > MAX_BATCH_SIZE: | |
| return [], [{"url": "", "error": f"Batch size exceeds maximum of {MAX_BATCH_SIZE}"}] | |
| valid_urls = [] | |
| errors = [] | |
| for url in urls: | |
| result = validate_url(url) | |
| if result.is_valid: | |
| valid_urls.append(result.value or url) | |
| else: | |
| errors.append({"url": url, "error": result.error or "Invalid URL"}) | |
| return valid_urls, errors | |
| # ============================================================================= | |
| # QUERY VALIDATION | |
| # ============================================================================= | |
| def validate_search_query(query: str) -> ValidationResult: | |
| """ | |
| Validate a search query. | |
| Args: | |
| query: Search query to validate | |
| Returns: | |
| ValidationResult | |
| """ | |
| if not query: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Search query is required" | |
| ) | |
| query = query.strip() | |
| if len(query) > MAX_QUERY_LENGTH: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"Query exceeds maximum length of {MAX_QUERY_LENGTH}" | |
| ) | |
| if len(query) < 2: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Query must be at least 2 characters" | |
| ) | |
| # Remove potential injection characters | |
| sanitized = re.sub(r'[<>"\']', '', query) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=sanitized, | |
| sanitized=sanitized != query | |
| ) | |
| def validate_tag(tag: str) -> ValidationResult: | |
| """ | |
| Validate a Medium tag. | |
| Args: | |
| tag: Tag to validate | |
| Returns: | |
| ValidationResult | |
| """ | |
| if not tag: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Tag is required" | |
| ) | |
| tag = tag.strip().lower() | |
| if len(tag) > MAX_TAG_LENGTH: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"Tag exceeds maximum length of {MAX_TAG_LENGTH}" | |
| ) | |
| # Tags should be alphanumeric with hyphens | |
| if not re.match(r'^[a-z0-9-]+$', tag): | |
| # Try to sanitize | |
| sanitized = re.sub(r'[^a-z0-9-]', '-', tag) | |
| sanitized = re.sub(r'-+', '-', sanitized).strip('-') | |
| if sanitized: | |
| return ValidationResult( | |
| is_valid=True, | |
| value=sanitized, | |
| sanitized=True | |
| ) | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Tag must contain only letters, numbers, and hyphens" | |
| ) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=tag | |
| ) | |
| # ============================================================================= | |
| # NUMERIC VALIDATION | |
| # ============================================================================= | |
| def validate_positive_int( | |
| value: Any, | |
| name: str = "value", | |
| min_val: int = 1, | |
| max_val: int = 100, | |
| ) -> ValidationResult: | |
| """ | |
| Validate a positive integer within bounds. | |
| Args: | |
| value: Value to validate | |
| name: Parameter name for error messages | |
| min_val: Minimum allowed value | |
| max_val: Maximum allowed value | |
| Returns: | |
| ValidationResult | |
| """ | |
| try: | |
| int_value = int(value) | |
| except (TypeError, ValueError): | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"{name} must be an integer" | |
| ) | |
| if int_value < min_val: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"{name} must be at least {min_val}" | |
| ) | |
| if int_value > max_val: | |
| return ValidationResult( | |
| is_valid=False, | |
| error=f"{name} must be at most {max_val}" | |
| ) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=str(int_value) | |
| ) | |
| # ============================================================================= | |
| # POST ID VALIDATION | |
| # ============================================================================= | |
| def validate_post_id(post_id: str) -> ValidationResult: | |
| """ | |
| Validate a Medium post ID. | |
| Post IDs are 8-12 character alphanumeric strings. | |
| Args: | |
| post_id: Post ID to validate | |
| Returns: | |
| ValidationResult | |
| """ | |
| if not post_id: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Post ID is required" | |
| ) | |
| post_id = post_id.strip() | |
| if len(post_id) < 8 or len(post_id) > 16: | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Post ID must be 8-16 characters" | |
| ) | |
| if not re.match(r'^[a-f0-9]+$', post_id.lower()): | |
| return ValidationResult( | |
| is_valid=False, | |
| error="Post ID must be hexadecimal" | |
| ) | |
| return ValidationResult( | |
| is_valid=True, | |
| value=post_id.lower() | |
| ) | |