Guilherme34's picture
Upload folder using huggingface_hub
aa15bce verified
"""Background scheduler that watches trigger definitions and executes them."""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from typing import Optional, Set
from ..agents.execution_agent.batch_manager import ExecutionBatchManager
from ..agents.execution_agent.runtime import ExecutionResult
from ..logging_config import logger
from .triggers import TriggerRecord, get_trigger_service
UTC = timezone.utc
def _utc_now() -> datetime:
return datetime.now(UTC)
def _isoformat(dt: datetime) -> str:
return dt.astimezone(UTC).isoformat(timespec="seconds").replace("+00:00", "Z")
class TriggerScheduler:
"""Polls stored triggers and launches execution agents when due."""
def __init__(self, poll_interval_seconds: float = 10.0) -> None:
self._poll_interval = poll_interval_seconds
self._service = get_trigger_service()
self._task: Optional[asyncio.Task[None]] = None
self._running = False
self._in_flight: Set[int] = set()
self._lock = asyncio.Lock()
async def start(self) -> None:
async with self._lock:
if self._task and not self._task.done():
return
loop = asyncio.get_running_loop()
self._running = True
self._task = loop.create_task(self._run(), name="trigger-scheduler")
logger.info("Trigger scheduler started", extra={"interval": self._poll_interval})
async def stop(self) -> None:
async with self._lock:
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
logger.info("Trigger scheduler stopped")
async def _run(self) -> None:
try:
while self._running:
await self._poll_once()
await asyncio.sleep(self._poll_interval)
except asyncio.CancelledError: # pragma: no cover - shutdown path
raise
except Exception as exc: # pragma: no cover - defensive
logger.exception("Trigger scheduler loop crashed", extra={"error": str(exc)})
async def _poll_once(self) -> None:
now = _utc_now()
due_triggers = self._service.get_due_triggers(before=now)
if not due_triggers:
return
for trigger in due_triggers:
if trigger.id in self._in_flight:
continue
self._in_flight.add(trigger.id)
asyncio.create_task(self._execute_trigger(trigger), name=f"trigger-{trigger.id}")
async def _execute_trigger(self, trigger: TriggerRecord) -> None:
try:
fired_at = _utc_now()
instructions = self._format_instructions(trigger, fired_at)
logger.info(
"Dispatching trigger",
extra={
"trigger_id": trigger.id,
"agent": trigger.agent_name,
"scheduled_for": trigger.next_trigger,
},
)
execution_manager = ExecutionBatchManager()
result = await execution_manager.execute_agent(
trigger.agent_name,
instructions,
)
if result.success:
self._handle_success(trigger, fired_at)
else:
error_text = result.error or result.response
self._handle_failure(trigger, fired_at, error_text)
except Exception as exc: # pragma: no cover - defensive
self._handle_failure(trigger, _utc_now(), str(exc))
logger.exception(
"Trigger execution failed unexpectedly",
extra={"trigger_id": trigger.id, "agent": trigger.agent_name},
)
finally:
self._in_flight.discard(trigger.id)
def _handle_success(self, trigger: TriggerRecord, fired_at: datetime) -> None:
logger.info(
"Trigger completed",
extra={"trigger_id": trigger.id, "agent": trigger.agent_name},
)
self._service.schedule_next_occurrence(trigger, fired_at=fired_at)
def _handle_failure(self, trigger: TriggerRecord, fired_at: datetime, error: str) -> None:
logger.warning(
"Trigger execution failed",
extra={
"trigger_id": trigger.id,
"agent": trigger.agent_name,
"error": error,
},
)
self._service.record_failure(trigger, error)
if trigger.recurrence_rule:
self._service.schedule_next_occurrence(trigger, fired_at=fired_at)
else:
self._service.clear_next_fire(trigger.id, agent_name=trigger.agent_name)
def _format_instructions(self, trigger: TriggerRecord, fired_at: datetime) -> str:
scheduled_for = trigger.next_trigger or _isoformat(fired_at)
metadata_lines = [f"Trigger ID: {trigger.id}"]
if trigger.recurrence_rule:
metadata_lines.append(f"Recurrence: {trigger.recurrence_rule}")
if trigger.timezone:
metadata_lines.append(f"Timezone: {trigger.timezone}")
if trigger.start_time:
metadata_lines.append(f"Start Time (UTC): {trigger.start_time}")
metadata = "\n".join(f"- {line}" for line in metadata_lines)
return (
f"Trigger fired at {_isoformat(fired_at)} (UTC).\n"
f"Scheduled occurrence time: {scheduled_for}.\n\n"
f"Metadata:\n{metadata}\n\n"
f"Payload:\n{trigger.payload}"
)
_scheduler_instance: Optional[TriggerScheduler] = None
def get_trigger_scheduler() -> TriggerScheduler:
global _scheduler_instance
if _scheduler_instance is None:
_scheduler_instance = TriggerScheduler()
return _scheduler_instance
__all__ = ["TriggerScheduler", "get_trigger_scheduler"]