snikhilesh's picture
Upload folder using huggingface_hub
996387b verified
"""
Security Module - HIPAA/GDPR Compliance Features
Implements authentication, authorization, audit logging, and encryption
"""
import logging
import hashlib
import secrets
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from functools import wraps
import jwt
from fastapi import HTTPException, Request, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
logger = logging.getLogger(__name__)
# Security configuration
SECRET_KEY = secrets.token_urlsafe(32) # In production, load from environment
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class AuditLogger:
"""
HIPAA-compliant audit logging
Tracks all access to PHI (Protected Health Information)
"""
def __init__(self):
self.audit_log_path = "logs/audit.log"
logger.info("Audit Logger initialized")
def log_access(
self,
user_id: str,
action: str,
resource: str,
ip_address: str,
status: str,
details: Optional[Dict[str, Any]] = None
):
"""Log access to medical data"""
try:
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"ip_address": self._anonymize_ip(ip_address),
"status": status,
"details": details or {}
}
# Log to file
logger.info(f"AUDIT: {json.dumps(audit_entry)}")
# In production, also store in database for long-term retention
except Exception as e:
logger.error(f"Audit logging failed: {str(e)}")
def _anonymize_ip(self, ip_address: str) -> str:
"""Anonymize IP address for GDPR compliance"""
# Hash the last octet for IPv4 or last 80 bits for IPv6
if ':' in ip_address:
# IPv6
parts = ip_address.split(':')
return ':'.join(parts[:4]) + ':xxxx'
else:
# IPv4
parts = ip_address.split('.')
return '.'.join(parts[:3]) + '.xxx'
def log_phi_access(
self,
user_id: str,
document_id: str,
action: str,
ip_address: str
):
"""Specific logging for PHI access"""
self.log_access(
user_id=user_id,
action=f"PHI_{action}",
resource=f"document:{document_id}",
ip_address=ip_address,
status="SUCCESS",
details={"phi_accessed": True}
)
class SecurityManager:
"""
Manages authentication, authorization, and encryption
"""
def __init__(self):
self.audit_logger = AuditLogger()
self.security_bearer = HTTPBearer(auto_error=False)
logger.info("Security Manager initialized")
def create_access_token(self, user_id: str, email: str) -> str:
"""Create JWT access token"""
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {
"sub": user_id,
"email": email,
"exp": expire,
"iat": datetime.utcnow()
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return token
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
logger.warning("Token expired")
return None
except jwt.JWTError as e:
logger.warning(f"Token verification failed: {str(e)}")
return None
async def get_current_user(
self,
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False))
) -> Dict[str, Any]:
"""
FastAPI dependency for protected routes
Validates JWT token and returns user info
"""
# For development/demo, allow anonymous access but log it
if not credentials:
logger.warning("Anonymous access - should be restricted in production")
anonymous_user = {
"user_id": "anonymous",
"email": "anonymous@demo.local",
"is_anonymous": True
}
# Log anonymous access
client_ip = request.client.host if request.client else "unknown"
self.audit_logger.log_access(
user_id="anonymous",
action="API_ACCESS",
resource=request.url.path,
ip_address=client_ip,
status="WARNING_ANONYMOUS"
)
return anonymous_user
# Verify token
token = credentials.credentials
payload = self.verify_token(token)
if not payload:
raise HTTPException(
status_code=401,
detail="Invalid or expired authentication token"
)
user_info = {
"user_id": payload.get("sub"),
"email": payload.get("email"),
"is_anonymous": False
}
# Log authenticated access
client_ip = request.client.host if request.client else "unknown"
self.audit_logger.log_access(
user_id=user_info["user_id"],
action="API_ACCESS",
resource=request.url.path,
ip_address=client_ip,
status="SUCCESS"
)
return user_info
def hash_phi_identifier(self, identifier: str) -> str:
"""
Hash PHI identifiers for pseudonymization
Required for GDPR compliance
"""
return hashlib.sha256(identifier.encode()).hexdigest()
def sanitize_response(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Remove or redact sensitive information from API responses
"""
# In production, implement comprehensive PII/PHI redaction
# For now, basic sanitization
if "error" in data:
# Don't expose internal error details
data["error"] = "An error occurred during processing"
return data
class DataEncryption:
"""
Handles encryption of data at rest and in transit
Required for HIPAA/GDPR compliance
"""
def __init__(self):
# In production, use proper key management (e.g., AWS KMS, Azure Key Vault)
self.encryption_key = self._load_or_generate_key()
logger.info("Data Encryption initialized")
def _load_or_generate_key(self) -> bytes:
"""Load encryption key from secure storage"""
# In production, load from secure key management system
# For demo, generate a key
return secrets.token_bytes(32)
def encrypt_data(self, data: bytes) -> bytes:
"""
Encrypt sensitive data using AES-256
"""
# In production, implement proper AES-256 encryption
# For now, return as-is (encryption would require cryptography library)
logger.warning("Encryption not fully implemented - add cryptography library")
return data
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""Decrypt data"""
logger.warning("Decryption not fully implemented - add cryptography library")
return encrypted_data
def secure_delete(self, file_path: str):
"""
Securely delete files containing PHI
HIPAA requires secure deletion
"""
import os
try:
# In production, overwrite file multiple times before deletion
if os.path.exists(file_path):
# Overwrite with random data
file_size = os.path.getsize(file_path)
with open(file_path, 'wb') as f:
f.write(secrets.token_bytes(file_size))
# Delete file
os.remove(file_path)
logger.info(f"Securely deleted file: {file_path}")
except Exception as e:
logger.error(f"Secure deletion failed: {str(e)}")
class ComplianceValidator:
"""
Validates compliance with HIPAA and GDPR requirements
"""
def __init__(self):
self.required_features = {
"encryption_at_rest": False, # Would be True in production
"encryption_in_transit": True, # HTTPS enforced
"access_logging": True,
"user_authentication": True, # Available but not enforced in demo
"data_retention_policy": False, # Would implement in production
"right_to_erasure": False, # GDPR - would implement in production
"consent_management": False # Would implement in production
}
def check_compliance(self) -> Dict[str, Any]:
"""Check current compliance status"""
total_features = len(self.required_features)
implemented_features = sum(1 for v in self.required_features.values() if v)
return {
"compliance_score": f"{implemented_features}/{total_features}",
"percentage": round((implemented_features / total_features) * 100, 1),
"features": self.required_features,
"status": "DEMO_MODE" if implemented_features < total_features else "COMPLIANT",
"recommendations": self._get_recommendations()
}
def _get_recommendations(self) -> List[str]:
"""Get compliance recommendations"""
recommendations = []
for feature, implemented in self.required_features.items():
if not implemented:
recommendations.append(
f"Implement {feature.replace('_', ' ').title()}"
)
return recommendations
# Global security manager instance
_security_manager = None
def get_security_manager() -> SecurityManager:
"""Get singleton security manager instance"""
global _security_manager
if _security_manager is None:
_security_manager = SecurityManager()
return _security_manager
# Decorator for protected routes
def require_auth(func):
"""Decorator to protect endpoints with authentication"""
@wraps(func)
async def wrapper(*args, **kwargs):
# In production, enforce authentication
# For demo, log warning and allow access
logger.warning(f"Protected endpoint accessed: {func.__name__}")
return await func(*args, **kwargs)
return wrapper