Spaces:
Running
Running
File size: 2,688 Bytes
5301c48 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
from dataclasses import dataclass, field
from os import environ, makedirs, path
from typing import Dict, Optional
from uuid import UUID, uuid4
import posthog
from starfish.data_factory.constants import APP_DATA_DIR
class TelemetryConfig:
"""Handles telemetry configuration and distinct ID management."""
@staticmethod
def generate_client_id() -> UUID:
"""Generate or retrieve a unique client identifier."""
config_path = path.join(path.expanduser(APP_DATA_DIR), ".starfish_config")
makedirs(path.dirname(config_path), exist_ok=True)
if path.exists(config_path):
with open(config_path) as f:
return UUID(f.read().strip())
new_id = uuid4()
with open(config_path, "w") as f:
f.write(str(new_id))
return new_id
@dataclass
class Event:
"""Represents a telemetry event."""
name: str
data: Dict
client_id: str = field(default_factory=lambda: str(TelemetryConfig.generate_client_id()))
@dataclass
class AnalyticsConfig:
"""Configuration for analytics service."""
api_key: str
active: bool = True
verbose: bool = False
endpoint: Optional[str] = None
class AnalyticsService:
"""Service for sending analytics data."""
def __init__(self, settings: AnalyticsConfig):
"""Initialize the analytics service with the given configuration.
Args:
settings (AnalyticsConfig): Configuration object containing API key,
activation status, verbosity, and optional custom endpoint.
"""
self.settings = settings
self._setup_client()
def _setup_client(self) -> None:
"""Configure the analytics client."""
posthog.project_api_key = self.settings.api_key
posthog.debug = self.settings.verbose
posthog.disable_geoip = False
if self.settings.endpoint:
posthog.host = self.settings.endpoint
def send_event(self, event: Event) -> None:
"""Transmit an analytics event."""
if not self.settings.active:
return
try:
posthog.capture(distinct_id=event.client_id, event=event.name, properties=event.data)
except Exception:
pass
# Initialize analytics service with environment configuration
analytics = AnalyticsService(
AnalyticsConfig(
api_key="phc_egjc8W2y5TEVEk54fAZjMcpnD56sOMuoZUUGiH50riA",
active=environ.get("TELEMETRY_ENABLED", "true").lower() in ("true", "1", "t"),
verbose=environ.get("DEBUG_MODE", "false").lower() in ("true", "1", "t"),
endpoint=environ.get("POSTHOG_HOST", "https://us.i.posthog.com"),
)
)
|