| from __future__ import annotations |
| from functools import wraps |
| from typing import TYPE_CHECKING, Any, Callable, TypeVar |
|
|
| from aiogram.types import CallbackQuery, Message |
|
|
| from bot.analytics.amplitude import AmplitudeTelegramLogger |
| from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent, EventProperties, EventType, UserProperties |
| from bot.core.config import settings |
| from bot.utils.singleton import SingletonMeta |
|
|
| if TYPE_CHECKING: |
| from collections.abc import Awaitable |
|
|
| _Func = TypeVar("_Func") |
|
|
|
|
| class AnalyticsService(metaclass=SingletonMeta): |
| def __init__(self, logger: AbstractAnalyticsLogger | None) -> None: |
| self.logger = logger |
|
|
| async def _track_error(self, user_id: int, error_text: str) -> None: |
| if not self.logger: |
| return |
|
|
| await self.logger.log_event( |
| BaseEvent( |
| user_id=user_id, |
| event_type="Error", |
| event_properties=EventProperties(text=error_text), |
| ), |
| ) |
|
|
| def track_event( |
| self, |
| event_name: EventType, |
| ) -> Callable[[Callable[..., Awaitable[_Func]]], Callable[..., Awaitable[_Func]]]: |
| """Decorator for tracking events in Amplitude, Google Analytics or Posthog.""" |
|
|
| def decorator( |
| handler: Callable[[Message | CallbackQuery, dict[str, Any]], Awaitable[_Func]], |
| ) -> Callable[..., Awaitable[_Func]]: |
| @wraps(handler) |
| async def wrapper(update: Message | CallbackQuery, *args: Any) -> Any: |
| if not self.logger: |
| return await handler(update, *args) |
|
|
| if (isinstance(update, (Message, CallbackQuery))) and update.from_user: |
| user_id = update.from_user.id |
| first_name = update.from_user.first_name |
| last_name = update.from_user.last_name |
| username = update.from_user.username |
| url = update.from_user.url |
| language = update.from_user.language_code |
| else: |
| return None |
|
|
| chat_id: int | None |
| chat_type: str | None |
| if isinstance(update, Message): |
| chat_id = update.chat.id |
| chat_type = update.chat.type |
| text = update.text |
| command = update.text if update.text and update.text.startswith("/") else None |
| elif isinstance(update, CallbackQuery): |
| chat_id = update.message.chat.id if update.message else None |
| chat_type = update.message.chat.type if update.message else None |
| text = update.data |
| command = None |
|
|
| await self.logger.log_event( |
| BaseEvent( |
| user_id=user_id, |
| event_type=event_name, |
| user_properties=UserProperties( |
| first_name=first_name, |
| last_name=last_name, |
| username=username, |
| url=url, |
| ), |
| event_properties=EventProperties( |
| chat_id=chat_id, |
| chat_type=chat_type, |
| text=text, |
| command=command, |
| ), |
| language=language, |
| ), |
| ) |
| try: |
| result = await handler(update, *args) |
| except Exception as e: |
| await self._track_error(user_id, str(e)) |
| raise |
| return result |
|
|
| return wrapper |
|
|
| return decorator |
|
|
|
|
| logger = AmplitudeTelegramLogger(api_token=settings.AMPLITUDE_API_KEY) if settings.AMPLITUDE_API_KEY else None |
|
|
| analytics = AnalyticsService(logger) |
|
|