Spaces:
Runtime error
Runtime error
from apscheduler.schedulers.asyncio import AsyncIOScheduler | |
from apscheduler.triggers.cron import CronTrigger | |
from datetime import datetime | |
from dependency_injector.resources import AsyncResource | |
from loguru import logger | |
from pydantic import ConfigDict | |
from pytz import timezone | |
from typing import Any, Self | |
from ctp_slack_bot.core import ApplicationComponentBase, Settings | |
class TaskService(ApplicationComponentBase): | |
""" | |
Service for running scheduled tasks. | |
""" | |
model_config = ConfigDict(frozen=True) | |
settings: Settings | |
_scheduler: AsyncIOScheduler | |
def model_post_init(self: Self, context: Any, /) -> None: | |
super().model_post_init(context) | |
self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone)) | |
def _configure_jobs(self: Self) -> None: | |
# Example jobs (uncomment and implement as needed) | |
# self._scheduler.add_job( | |
# send_error_report, | |
# CronTrigger(hour=7, minute=0), | |
# id="daily_error_report", | |
# name="Daily Error Report", | |
# replace_existing=True, | |
# ) | |
# self._scheduler.add_job( | |
# cleanup_old_transcripts, | |
# CronTrigger(day_of_week="sun", hour=1, minute=0), | |
# id="weekly_transcript_cleanup", | |
# name="Weekly Transcript Cleanup", | |
# replace_existing=True, | |
# ) | |
pass | |
async def start(self: Self) -> None: | |
logger.info("Starting scheduler…") | |
self._scheduler.start() | |
async def stop(self: Self) -> None: | |
if self._scheduler.running: | |
self._scheduler.shutdown() | |
logger.info("Stopped scheduler.") | |
else: | |
logger.debug("The scheduler is not running. There is no scheduler to shut down.") | |
def name(self: Self) -> str: | |
return "task_service" | |
class TaskServiceResource(AsyncResource): | |
async def init(self: Self, settings: Settings) -> TaskService: | |
return TaskService(settings=settings) | |
async def shutdown(self: Self, task_service: TaskService) -> None: | |
await task_service.stop() | |