LiKenun's picture
Refactor #6
f0fe0fd
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from dependency_injector.resources import AsyncResource
from loguru import logger
from pydantic import ConfigDict
from pytz import timezone
from typing import Any, Self
from ctp_slack_bot.core import ApplicationComponentBase, Settings
class TaskService(ApplicationComponentBase):
"""
Service for running scheduled tasks.
"""
model_config = ConfigDict(frozen=True)
settings: Settings
_scheduler: AsyncIOScheduler
def model_post_init(self: Self, context: Any, /) -> None:
super().model_post_init(context)
self._scheduler = AsyncIOScheduler(timezone=timezone(self.settings.scheduler_timezone))
def _configure_jobs(self: Self) -> None:
# Example jobs (uncomment and implement as needed)
# self._scheduler.add_job(
# send_error_report,
# CronTrigger(hour=7, minute=0),
# id="daily_error_report",
# name="Daily Error Report",
# replace_existing=True,
# )
# self._scheduler.add_job(
# cleanup_old_transcripts,
# CronTrigger(day_of_week="sun", hour=1, minute=0),
# id="weekly_transcript_cleanup",
# name="Weekly Transcript Cleanup",
# replace_existing=True,
# )
pass
async def start(self: Self) -> None:
logger.info("Starting scheduler…")
self._scheduler.start()
async def stop(self: Self) -> None:
if self._scheduler.running:
self._scheduler.shutdown()
logger.info("Stopped scheduler.")
else:
logger.debug("The scheduler is not running. There is no scheduler to shut down.")
@property
def name(self: Self) -> str:
return "task_service"
class TaskServiceResource(AsyncResource):
async def init(self: Self, settings: Settings) -> TaskService:
return TaskService(settings=settings)
async def shutdown(self: Self, task_service: TaskService) -> None:
await task_service.stop()