Taskflow-App / src /utils /metrics.py
Tahasaif3's picture
'code
34e27fb
from prometheus_client import Counter, Histogram, Gauge
import time
# Event publishing metrics
event_published_counter = Counter(
'task_events_published_total',
'Total number of task events published',
['event_type']
)
event_publish_duration = Histogram(
'task_event_publish_duration_seconds',
'Time spent publishing task events',
['event_type']
)
event_publish_errors = Counter(
'task_event_publish_errors_total',
'Total number of task event publishing errors',
['event_type']
)
# Circuit breaker metrics
circuit_breaker_state = Gauge(
'circuit_breaker_state',
'Current state of the circuit breaker (0=closed, 1=open, 2=half-open)',
['breaker_name']
)
circuit_breaker_failures = Counter(
'circuit_breaker_failures_total',
'Total number of circuit breaker failures',
['breaker_name']
)
# Rate limiting metrics (for T048)
rate_limiter_requests = Counter(
'rate_limiter_requests_total',
'Total number of requests to rate limiter',
['endpoint']
)
rate_limiter_rejections = Counter(
'rate_limiter_rejections_total',
'Total number of requests rejected by rate limiter',
['endpoint']
)
def increment_event_published(event_type: str):
"""Increment the counter for published events."""
event_published_counter.labels(event_type=event_type).inc()
def observe_event_publish_duration(event_type: str, duration: float):
"""Record the duration of an event publishing operation."""
event_publish_duration.labels(event_type=event_type).observe(duration)
def increment_event_publish_error(event_type: str):
"""Increment the counter for event publishing errors."""
event_publish_errors.labels(event_type=event_type).inc()
def set_circuit_breaker_state(breaker_name: str, state_value: int):
"""Set the gauge value for circuit breaker state."""
circuit_breaker_state.labels(breaker_name=breaker_name).set(state_value)
def increment_circuit_breaker_failure(breaker_name: str):
"""Increment the counter for circuit breaker failures."""
circuit_breaker_failures.labels(breaker_name=breaker_name).inc()
def increment_rate_limiter_request(endpoint: str):
"""Increment the counter for rate limiter requests."""
rate_limiter_requests.labels(endpoint=endpoint).inc()
def increment_rate_limiter_rejection(endpoint: str):
"""Increment the counter for rate limiter rejections."""
rate_limiter_rejections.labels(endpoint=endpoint).inc()