Spaces:
Running
Running
| """ | |
| Typed execution errors and results. | |
| Provides explicit error typing instead of plain strings, | |
| structured results, and handling policies. | |
| """ | |
| import builtins | |
| from datetime import UTC, datetime | |
| from enum import Enum | |
| from typing import Any | |
| from pydantic import BaseModel, ConfigDict | |
| __all__ = [ | |
| "AgentNotFoundError", | |
| "BudgetExceededError", | |
| "ErrorAction", | |
| "ErrorPolicy", | |
| "ExecutionError", | |
| "ExecutionMetrics", | |
| "RetryExhaustedError", | |
| "StepExecutionResult", | |
| "TimeoutError", | |
| "ValidationError", | |
| ] | |
| class ExecutionError(Exception): | |
| """Base execution error for a step/agent with metadata.""" | |
| def __init__( | |
| self, | |
| message: str, | |
| agent_id: str | None = None, | |
| step_index: int | None = None, | |
| cause: Exception | None = None, | |
| recoverable: bool = True, | |
| ): | |
| super().__init__(message) | |
| self.message = message | |
| self.agent_id = agent_id | |
| self.step_index = step_index | |
| self.cause = cause | |
| self.recoverable = recoverable | |
| self.timestamp = datetime.now(UTC) | |
| def to_dict(self) -> dict[str, Any]: | |
| """Serialize the error to a dictionary for logging or response.""" | |
| return { | |
| "type": self.__class__.__name__, | |
| "message": self.message, | |
| "agent_id": self.agent_id, | |
| "step_index": self.step_index, | |
| "recoverable": self.recoverable, | |
| "timestamp": self.timestamp.isoformat(), | |
| "cause": str(self.cause) if self.cause else None, | |
| } | |
| class TimeoutError(ExecutionError, builtins.TimeoutError): # noqa: A001 | |
| """Agent timeout error.""" | |
| def __init__( | |
| self, | |
| agent_id: str, | |
| timeout_seconds: float, | |
| step_index: int | None = None, | |
| ): | |
| super().__init__( | |
| f"Agent '{agent_id}' timed out after {timeout_seconds}s", | |
| agent_id=agent_id, | |
| step_index=step_index, | |
| recoverable=True, | |
| ) | |
| self.timeout_seconds = timeout_seconds | |
| class RetryExhaustedError(ExecutionError): | |
| """Error raised when all retry attempts are exhausted.""" | |
| def __init__( | |
| self, | |
| agent_id: str, | |
| attempts: int, | |
| last_error: Exception | None = None, | |
| step_index: int | None = None, | |
| ): | |
| super().__init__( | |
| f"Agent '{agent_id}' failed after {attempts} attempts", | |
| agent_id=agent_id, | |
| step_index=step_index, | |
| cause=last_error, | |
| recoverable=False, | |
| ) | |
| self.attempts = attempts | |
| self.last_error = last_error | |
| class BudgetExceededError(ExecutionError): | |
| """Error raised when a budget limit is exceeded.""" | |
| def __init__( | |
| self, | |
| budget_type: str, | |
| limit: float, | |
| used: float, | |
| agent_id: str | None = None, | |
| ): | |
| super().__init__( | |
| f"{budget_type.capitalize()} budget exceeded: {used}/{limit}", | |
| agent_id=agent_id, | |
| recoverable=False, | |
| ) | |
| self.budget_type = budget_type | |
| self.limit = limit | |
| self.used = used | |
| class AgentNotFoundError(ExecutionError): | |
| """Error raised when an agent is not found in the graph.""" | |
| def __init__(self, agent_id: str): | |
| super().__init__( | |
| f"Agent '{agent_id}' not found in graph", | |
| agent_id=agent_id, | |
| recoverable=False, | |
| ) | |
| class ValidationError(ExecutionError): | |
| """Error raised when input data or parameters fail validation.""" | |
| def __init__( | |
| self, | |
| message: str, | |
| field: str | None = None, | |
| value: Any = None, | |
| ): | |
| super().__init__(message, recoverable=False) | |
| self.field = field | |
| self.value = value | |
| class ErrorAction(str, Enum): | |
| SKIP = "skip" | |
| RETRY = "retry" | |
| PRUNE = "prune" | |
| FALLBACK = "fallback" | |
| ROLLBACK = "rollback" | |
| ABORT = "abort" | |
| class ErrorPolicy(BaseModel): | |
| """Policy that maps error types to handling actions.""" | |
| on_timeout: ErrorAction = ErrorAction.RETRY | |
| on_retry_exhausted: ErrorAction = ErrorAction.PRUNE | |
| on_budget_exceeded: ErrorAction = ErrorAction.ABORT | |
| on_agent_not_found: ErrorAction = ErrorAction.SKIP | |
| on_validation_error: ErrorAction = ErrorAction.ABORT | |
| on_unknown_error: ErrorAction = ErrorAction.SKIP | |
| max_skipped_agents: int = 5 | |
| abort_on_critical_path: bool = True | |
| def get_action(self, error: "ExecutionError") -> ErrorAction: | |
| """Return the action corresponding to the error type.""" | |
| if isinstance(error, TimeoutError): | |
| return self.on_timeout | |
| if isinstance(error, RetryExhaustedError): | |
| return self.on_retry_exhausted | |
| if isinstance(error, BudgetExceededError): | |
| return self.on_budget_exceeded | |
| if isinstance(error, AgentNotFoundError): | |
| return self.on_agent_not_found | |
| if isinstance(error, ValidationError): | |
| return self.on_validation_error | |
| return self.on_unknown_error | |
| class ExecutionMetrics(BaseModel): | |
| """Aggregated metrics for tokens, requests, agents, and timing.""" | |
| prompt_tokens: int = 0 | |
| completion_tokens: int = 0 | |
| total_tokens: int = 0 | |
| start_time: datetime | None = None | |
| end_time: datetime | None = None | |
| latency_ms: float = 0.0 | |
| total_requests: int = 0 | |
| successful_requests: int = 0 | |
| failed_requests: int = 0 | |
| retried_requests: int = 0 | |
| total_agents: int = 0 | |
| executed_agents: int = 0 | |
| skipped_agents: int = 0 | |
| failed_agents: int = 0 | |
| def success_rate(self) -> float: | |
| """Fraction of successful requests.""" | |
| if self.total_requests == 0: | |
| return 0.0 | |
| return self.successful_requests / self.total_requests | |
| def duration_seconds(self) -> float: | |
| """Total execution duration in seconds.""" | |
| if self.start_time and self.end_time: | |
| return (self.end_time - self.start_time).total_seconds() | |
| return 0.0 | |
| def add_request( | |
| self, | |
| prompt_tokens: int, | |
| completion_tokens: int, | |
| success: bool, | |
| latency_ms: float, | |
| retried: bool = False, | |
| ) -> None: | |
| """Add metrics for a single request or step.""" | |
| self.prompt_tokens += prompt_tokens | |
| self.completion_tokens += completion_tokens | |
| self.total_tokens += prompt_tokens + completion_tokens | |
| self.total_requests += 1 | |
| self.latency_ms += latency_ms | |
| if success: | |
| self.successful_requests += 1 | |
| else: | |
| self.failed_requests += 1 | |
| if retried: | |
| self.retried_requests += 1 | |
| def to_dict(self) -> dict[str, Any]: | |
| """Serialize metrics to a dictionary.""" | |
| return { | |
| "tokens": { | |
| "prompt": self.prompt_tokens, | |
| "completion": self.completion_tokens, | |
| "total": self.total_tokens, | |
| }, | |
| "requests": { | |
| "total": self.total_requests, | |
| "successful": self.successful_requests, | |
| "failed": self.failed_requests, | |
| "retried": self.retried_requests, | |
| "success_rate": self.success_rate, | |
| }, | |
| "agents": { | |
| "total": self.total_agents, | |
| "executed": self.executed_agents, | |
| "skipped": self.skipped_agents, | |
| "failed": self.failed_agents, | |
| }, | |
| "timing": { | |
| "duration_seconds": self.duration_seconds, | |
| "total_latency_ms": self.latency_ms, | |
| "avg_latency_ms": self.latency_ms / max(1, self.total_requests), | |
| }, | |
| } | |
| class StepExecutionResult(BaseModel): | |
| """Result of a step/agent execution with metrics and status flags.""" | |
| model_config = ConfigDict(arbitrary_types_allowed=True) | |
| agent_id: str | |
| success: bool | |
| response: str | None = None | |
| error: "ExecutionError | None" = None | |
| prompt_tokens: int = 0 | |
| completion_tokens: int = 0 | |
| latency_ms: float = 0.0 | |
| attempts: int = 1 | |
| quality_score: float = 1.0 | |
| skipped: bool = False | |
| fallback_used: bool = False | |
| def tokens_used(self) -> int: | |
| """Total tokens consumed by this step.""" | |
| return self.prompt_tokens + self.completion_tokens | |
| def to_dict(self) -> dict[str, Any]: | |
| """Serialize the step result to a dictionary.""" | |
| return { | |
| "agent_id": self.agent_id, | |
| "success": self.success, | |
| "response_length": len(self.response) if self.response else 0, | |
| "error": self.error.to_dict() if self.error else None, | |
| "metrics": { | |
| "prompt_tokens": self.prompt_tokens, | |
| "completion_tokens": self.completion_tokens, | |
| "latency_ms": self.latency_ms, | |
| "attempts": self.attempts, | |
| "quality_score": self.quality_score, | |
| }, | |
| "status": { | |
| "skipped": self.skipped, | |
| "fallback_used": self.fallback_used, | |
| }, | |
| } | |