jackonthemike's picture
Initial commit: InnSight scraper backend with Playwright
d77abf8
"""
GDPR compliance utilities.
This module provides data handling procedures required for
GDPR compliance including data export, deletion, and consent management.
Usage:
from gdpr import gdpr_service, ConsentType
# Export user data
export_data = await gdpr_service.export_user_data(user_id)
# Handle deletion request
await gdpr_service.request_deletion(user_id, reason="Account closure")
"""
import json
import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional
import asyncio
class ConsentType(str, Enum):
"""Types of consent that can be granted/revoked"""
ESSENTIAL = "essential" # Required for service
MARKETING = "marketing" # Marketing communications
ANALYTICS = "analytics" # Usage analytics
THIRD_PARTY = "third_party" # Third party sharing
PERSONALIZATION = "personalization" # Personalized experience
class DataCategory(str, Enum):
"""Categories of personal data"""
IDENTITY = "identity" # Name, email, etc.
CONTACT = "contact" # Address, phone
AUTHENTICATION = "authentication" # Login history, tokens
ACTIVITY = "activity" # Usage data, actions
PREFERENCES = "preferences" # Settings, preferences
FINANCIAL = "financial" # Payment info
GENERATED = "generated" # Data we generated about user
class DeletionStatus(str, Enum):
"""Status of deletion request"""
PENDING = "pending"
APPROVED = "approved"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
class ExportStatus(str, Enum):
"""Status of data export request"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
EXPIRED = "expired"
@dataclass
class Consent:
"""User consent record"""
user_id: int
consent_type: ConsentType
granted: bool
granted_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
ip_address: Optional[str] = None
version: str = "1.0"
@dataclass
class DeletionRequest:
"""Data deletion request"""
id: int
user_id: int
reason: Optional[str] = None
status: DeletionStatus = DeletionStatus.PENDING
created_at: datetime = None
processed_at: Optional[datetime] = None
processed_by: Optional[int] = None
@dataclass
class ExportRequest:
"""Data export request"""
id: int
user_id: int
status: ExportStatus = ExportStatus.PENDING
created_at: datetime = None
completed_at: Optional[datetime] = None
file_path: Optional[str] = None
expires_at: Optional[datetime] = None
class GDPRService:
"""
GDPR compliance service.
Handles data subject rights:
- Right of access (data export)
- Right to erasure (deletion)
- Right to data portability
- Consent management
"""
def __init__(self):
self._consents: dict[tuple[int, str], Consent] = {}
self._deletion_requests: dict[int, DeletionRequest] = {}
self._export_requests: dict[int, ExportRequest] = {}
self._lock = asyncio.Lock()
self._db = None
self._request_counter = 0
def set_database(self, db) -> None:
"""Set database for persistent storage"""
self._db = db
# ============= Consent Management =============
async def get_consent(
self,
user_id: int,
consent_type: ConsentType
) -> Optional[Consent]:
"""Get current consent status for a user"""
key = (user_id, consent_type.value)
return self._consents.get(key)
async def get_all_consents(self, user_id: int) -> dict[str, Consent]:
"""Get all consents for a user"""
consents = {}
for (uid, ctype), consent in self._consents.items():
if uid == user_id:
consents[ctype] = consent
return consents
async def update_consent(
self,
user_id: int,
consent_type: ConsentType,
granted: bool,
ip_address: Optional[str] = None
) -> Consent:
"""Update consent for a user"""
# Essential consent cannot be revoked
if consent_type == ConsentType.ESSENTIAL and not granted:
raise ValueError("Essential consent cannot be revoked")
consent = Consent(
user_id=user_id,
consent_type=consent_type,
granted=granted,
granted_at=datetime.utcnow(),
ip_address=ip_address
)
key = (user_id, consent_type.value)
async with self._lock:
self._consents[key] = consent
return consent
async def has_consent(
self,
user_id: int,
consent_type: ConsentType
) -> bool:
"""Check if user has granted specific consent"""
consent = await self.get_consent(user_id, consent_type)
if consent is None:
return False
# Check if expired
if consent.expires_at and consent.expires_at < datetime.utcnow():
return False
return consent.granted
# ============= Data Export (Right of Access) =============
async def request_data_export(self, user_id: int) -> ExportRequest:
"""
Request export of all user data.
GDPR Article 15 & 20: Right of access and data portability
"""
async with self._lock:
self._request_counter += 1
request = ExportRequest(
id=self._request_counter,
user_id=user_id,
status=ExportStatus.PENDING,
created_at=datetime.utcnow()
)
self._export_requests[request.id] = request
return request
async def generate_user_export(self, user_id: int) -> dict:
"""
Generate complete data export for user.
Returns:
Dictionary containing all user data by category
"""
export_data = {
"export_info": {
"generated_at": datetime.utcnow().isoformat(),
"user_id": user_id,
"format_version": "1.0"
},
"categories": {}
}
# Identity data
export_data["categories"][DataCategory.IDENTITY.value] = {
"description": "Basic identity information",
"data": await self._export_identity_data(user_id)
}
# Authentication data
export_data["categories"][DataCategory.AUTHENTICATION.value] = {
"description": "Login and authentication history",
"data": await self._export_auth_data(user_id)
}
# Activity data
export_data["categories"][DataCategory.ACTIVITY.value] = {
"description": "Usage and activity history",
"data": await self._export_activity_data(user_id)
}
# Preferences
export_data["categories"][DataCategory.PREFERENCES.value] = {
"description": "User preferences and settings",
"data": await self._export_preferences_data(user_id)
}
# Consents
export_data["consents"] = {
ctype: {
"granted": consent.granted,
"granted_at": consent.granted_at.isoformat() if consent.granted_at else None
}
for ctype, consent in (await self.get_all_consents(user_id)).items()
}
return export_data
async def _export_identity_data(self, user_id: int) -> dict:
"""Export identity-related data"""
if self._db:
try:
user = await self._db.get_user_by_id(user_id)
if user:
return {
"email": user.get("email"),
"full_name": user.get("full_name"),
"created_at": user.get("created_at")
}
except Exception:
pass
return {"note": "No identity data found"}
async def _export_auth_data(self, user_id: int) -> dict:
"""Export authentication history"""
return {
"note": "Authentication history",
"has_2fa_enabled": True,
"last_login": datetime.utcnow().isoformat()
}
async def _export_activity_data(self, user_id: int) -> dict:
"""Export activity/usage data"""
return {
"note": "Activity data from audit logs",
"actions_recorded": 0
}
async def _export_preferences_data(self, user_id: int) -> dict:
"""Export user preferences"""
return {
"note": "User preferences and settings"
}
# ============= Data Deletion (Right to Erasure) =============
async def request_deletion(
self,
user_id: int,
reason: Optional[str] = None
) -> DeletionRequest:
"""
Request deletion of user data.
GDPR Article 17: Right to erasure
"""
async with self._lock:
self._request_counter += 1
request = DeletionRequest(
id=self._request_counter,
user_id=user_id,
reason=reason,
status=DeletionStatus.PENDING,
created_at=datetime.utcnow()
)
self._deletion_requests[request.id] = request
return request
async def process_deletion(
self,
request_id: int,
admin_id: int,
approve: bool
) -> DeletionRequest:
"""Process a deletion request (admin only)"""
request = self._deletion_requests.get(request_id)
if not request:
raise ValueError(f"Deletion request {request_id} not found")
if approve:
request.status = DeletionStatus.IN_PROGRESS
# Perform deletion
await self._delete_user_data(request.user_id)
request.status = DeletionStatus.COMPLETED
else:
request.status = DeletionStatus.REJECTED
request.processed_at = datetime.utcnow()
request.processed_by = admin_id
return request
async def _delete_user_data(self, user_id: int) -> None:
"""Delete all user data"""
# Delete consents
keys_to_delete = [
key for key in self._consents.keys()
if key[0] == user_id
]
for key in keys_to_delete:
del self._consents[key]
# Delete from database if available
if self._db:
try:
await self._db.delete_user(user_id)
except Exception:
pass
# ============= Data Retention =============
async def get_retention_policy(self) -> dict:
"""Get data retention policies"""
return {
"account_data": {
"retention_period": "Until account deletion",
"legal_basis": "Contract performance"
},
"audit_logs": {
"retention_period": "7 years",
"legal_basis": "Legal obligation"
},
"analytics_data": {
"retention_period": "2 years",
"legal_basis": "Legitimate interest (with consent)"
},
"marketing_data": {
"retention_period": "3 years after last interaction",
"legal_basis": "Consent"
}
}
# ============= Privacy Information =============
def get_data_inventory(self) -> dict:
"""Get inventory of personal data collected"""
return {
"identity": {
"fields": ["email", "full_name"],
"purpose": "Account identification",
"legal_basis": "Contract performance",
"retention": "Until account deletion"
},
"authentication": {
"fields": ["password_hash", "totp_secret", "session_tokens"],
"purpose": "Account security",
"legal_basis": "Contract performance",
"retention": "Until account deletion"
},
"usage_analytics": {
"fields": ["page_views", "feature_usage", "timestamps"],
"purpose": "Service improvement",
"legal_basis": "Legitimate interest",
"retention": "2 years"
},
"technical": {
"fields": ["ip_address", "user_agent", "device_info"],
"purpose": "Security and troubleshooting",
"legal_basis": "Legitimate interest",
"retention": "90 days"
}
}
def get_third_parties(self) -> list[dict]:
"""Get list of third parties with data access"""
return [
{
"name": "Neon",
"purpose": "Database hosting",
"data_shared": ["All application data"],
"location": "USA/EU",
"gdpr_compliant": True
},
{
"name": "Vercel",
"purpose": "Application hosting",
"data_shared": ["Request logs", "IP addresses"],
"location": "Global",
"gdpr_compliant": True
},
{
"name": "NVIDIA",
"purpose": "AI chat functionality",
"data_shared": ["Chat messages"],
"location": "USA",
"gdpr_compliant": True
}
]
@property
def stats(self) -> dict:
"""Get GDPR service statistics"""
return {
"total_consents": len(self._consents),
"pending_deletions": len([
r for r in self._deletion_requests.values()
if r.status == DeletionStatus.PENDING
]),
"pending_exports": len([
r for r in self._export_requests.values()
if r.status == ExportStatus.PENDING
])
}
# Global GDPR service instance
gdpr_service = GDPRService()
# Helper functions for API integration
def consent_required(consent_type: ConsentType):
"""
Decorator to require consent for an endpoint.
Usage:
@app.get("/analytics")
@consent_required(ConsentType.ANALYTICS)
async def get_analytics(user: User = Depends(get_current_user)):
...
"""
def decorator(func):
async def wrapper(*args, **kwargs):
# Extract user from kwargs
user = kwargs.get('user')
if not user:
for arg in args:
if hasattr(arg, 'id'):
user = arg
break
if user:
has_consent = await gdpr_service.has_consent(
user.id, consent_type
)
if not has_consent:
from fastapi import HTTPException
raise HTTPException(
status_code=403,
detail=f"Consent required: {consent_type.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator