| | """ |
| | Supabase structured logger with per-request user context. |
| | |
| | Usage: |
| | from supabase_logger import init_logger, log_event, set_user_context, get_user_context, set_request_source |
| | |
| | # At startup |
| | init_logger(supabase_client) |
| | |
| | # In middleware / auth |
| | set_user_context({"user_id": "...", "email": "..."}) |
| | set_request_source({"source_domain": "dlpo-mbok-dev.hf.space"}) |
| | |
| | # Anywhere |
| | log_event("user", "click_step01", session_id="abc123", metadata={"force": False}) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import threading |
| | import traceback |
| | from contextvars import ContextVar |
| | from datetime import datetime, timezone |
| | from typing import Any, Dict, Optional |
| |
|
| | from supabase import Client |
| |
|
| | _supabase: Optional[Client] = None |
| |
|
| | _current_user: ContextVar[Optional[Dict[str, Any]]] = ContextVar( |
| | "current_user", default=None |
| | ) |
| | _request_source: ContextVar[Optional[Dict[str, str]]] = ContextVar( |
| | "request_source", default=None |
| | ) |
| |
|
| | |
| | _BLOCKED_SOURCE_DOMAINS = { |
| | "proxy.spaces.internal.huggingface.tech", |
| | } |
| |
|
| |
|
| | def init_logger(client: Client) -> None: |
| | global _supabase |
| | _supabase = client |
| |
|
| |
|
| | def set_user_context(user: Optional[Dict[str, Any]]) -> None: |
| | _current_user.set(user) |
| |
|
| |
|
| | def get_user_context() -> Optional[Dict[str, Any]]: |
| | return _current_user.get() |
| |
|
| |
|
| | def set_request_source(source: Optional[Dict[str, str]]) -> None: |
| | """リクエストの流入元情報をセット(FastAPI middleware から呼ぶ)""" |
| | _request_source.set(source) |
| |
|
| |
|
| | def log_event( |
| | event_type: str, |
| | message: str, |
| | *, |
| | level: str = "INFO", |
| | metadata: Optional[Dict[str, Any]] = None, |
| | user_override: Optional[Dict[str, Any]] = None, |
| | session_id: Optional[str] = None, |
| | source: Optional[Dict[str, str]] = None, |
| | ) -> None: |
| | """ |
| | Insert a structured log row into Supabase ``activity_logs``. |
| | |
| | Source resolution priority: |
| | 1. Explicit ``source`` argument |
| | 2. ContextVar set by middleware (``set_request_source``) |
| | 3. ``source_domain`` key inside ``metadata`` |
| | |
| | If the resolved ``source_domain`` is an internal proxy domain the event is |
| | silently dropped (not persisted). |
| | |
| | Falls back to stdout if the insert fails so the main request is |
| | never blocked by a logging error. |
| | """ |
| | user = user_override or _current_user.get() |
| |
|
| | |
| | clean_meta: Dict[str, Any] = dict(metadata) if metadata else {} |
| | resolved_source: Dict[str, Any] = {} |
| |
|
| | if source: |
| | resolved_source = source |
| | elif _request_source.get(): |
| | resolved_source = _request_source.get() |
| | else: |
| | |
| | dm = clean_meta.pop("source_domain", None) |
| | if dm: |
| | resolved_source = {"source_domain": dm} |
| |
|
| | |
| | clean_meta.pop("source_domain", None) |
| | |
| | clean_meta.pop("source_channel", None) |
| |
|
| | source_domain = resolved_source.get("source_domain") |
| |
|
| | |
| | if source_domain in _BLOCKED_SOURCE_DOMAINS: |
| | return |
| |
|
| | row = { |
| | "event_type": event_type, |
| | "level": level, |
| | "message": message, |
| | "user_id": user.get("user_id") if user else None, |
| | "user_email": user.get("email") if user else None, |
| | "session_id": session_id, |
| | "source_domain": source_domain, |
| | "metadata": clean_meta, |
| | "created_at": datetime.now(timezone.utc).isoformat(), |
| | } |
| |
|
| | def _insert(): |
| | try: |
| | if _supabase is not None: |
| | _supabase.table("activity_logs").insert(row).execute() |
| | except Exception: |
| | print(f"[SUPABASE_LOG_ERROR] insert failed: {traceback.format_exc()}") |
| | print(f"[SUPABASE_LOG_ERROR] row: {row}") |
| |
|
| | |
| | threading.Thread(target=_insert, daemon=True).start() |
| |
|