mona / utils /automation /scheduled_tasks.py
mrradix's picture
Upload 48 files
8e4018d verified
import json
import time
import uuid
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime, timedelta
import threading
import schedule
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, AutomationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class ScheduledTask:
"""Scheduled task for time-based triggers"""
def __init__(self, name: str, task_type: str, task_config: Dict[str, Any],
description: Optional[str] = None):
"""Initialize a scheduled task
Args:
name: Task name
task_type: Type of task (e.g., 'notification', 'data_update', 'api_call')
task_config: Task configuration
description: Task description (optional)
"""
self.id = str(uuid.uuid4())
self.name = name
self.description = description or ""
self.task_type = task_type
self.task_config = task_config
self.schedule_type = None
self.schedule_config = {}
self.enabled = True
self.created_at = datetime.now().isoformat()
self.updated_at = self.created_at
self.last_run = None
self.next_run = None
self.run_count = 0
self.status = "pending" # pending, running, completed, failed
self.last_result = None
@handle_exceptions
def set_schedule(self, schedule_type: str, schedule_config: Dict[str, Any]) -> None:
"""Set task schedule
Args:
schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly', 'once')
schedule_config: Schedule configuration
"""
self.schedule_type = schedule_type
self.schedule_config = schedule_config
self.updated_at = datetime.now().isoformat()
# Calculate next run time
self._calculate_next_run()
def _calculate_next_run(self) -> None:
"""Calculate the next run time based on schedule"""
now = datetime.now()
if self.schedule_type == "interval":
interval_minutes = self.schedule_config.get("minutes", 60)
self.next_run = (now + timedelta(minutes=interval_minutes)).isoformat()
elif self.schedule_type == "daily":
time_str = self.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run = next_run + timedelta(days=1)
self.next_run = next_run.isoformat()
elif self.schedule_type == "weekly":
day = self.schedule_config.get("day", "monday").lower()
time_str = self.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
# Map day names to weekday numbers (0 = Monday, 6 = Sunday)
day_map = {
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6
}
target_weekday = day_map.get(day, 0)
# Calculate days until the next occurrence of the target weekday
days_ahead = target_weekday - now.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead)
self.next_run = next_run.isoformat()
elif self.schedule_type == "monthly":
day = self.schedule_config.get("day", 1)
time_str = self.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
# Calculate the next occurrence of the target day in the current or next month
if day <= now.day:
# Target day already happened this month, move to next month
if now.month == 12:
next_run = datetime(now.year + 1, 1, day, hour, minute)
else:
next_run = datetime(now.year, now.month + 1, day, hour, minute)
else:
# Target day is still to come this month
next_run = datetime(now.year, now.month, day, hour, minute)
self.next_run = next_run.isoformat()
elif self.schedule_type == "once":
date_str = self.schedule_config.get("date", "")
time_str = self.schedule_config.get("time", "09:00")
if date_str:
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
hour, minute = map(int, time_str.split(":"))
next_run = date_obj.replace(hour=hour, minute=minute)
if next_run > now:
self.next_run = next_run.isoformat()
else:
self.next_run = None # Past date, won't run
except:
self.next_run = None
else:
self.next_run = None
@handle_exceptions
def enable(self) -> None:
"""Enable the task"""
self.enabled = True
self.updated_at = datetime.now().isoformat()
self._calculate_next_run()
@handle_exceptions
def disable(self) -> None:
"""Disable the task"""
self.enabled = False
self.updated_at = datetime.now().isoformat()
self.next_run = None
@handle_exceptions
def to_dict(self) -> Dict[str, Any]:
"""Convert task to dictionary
Returns:
Task as dictionary
"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"task_type": self.task_type,
"task_config": self.task_config,
"schedule_type": self.schedule_type,
"schedule_config": self.schedule_config,
"enabled": self.enabled,
"created_at": self.created_at,
"updated_at": self.updated_at,
"last_run": self.last_run,
"next_run": self.next_run,
"run_count": self.run_count,
"status": self.status,
"last_result": self.last_result
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ScheduledTask':
"""Create task from dictionary
Args:
data: Task data
Returns:
ScheduledTask instance
"""
task = cls(data["name"], data["task_type"], data["task_config"], data.get("description", ""))
task.id = data["id"]
task.schedule_type = data.get("schedule_type")
task.schedule_config = data.get("schedule_config", {})
task.enabled = data["enabled"]
task.created_at = data["created_at"]
task.updated_at = data["updated_at"]
task.last_run = data.get("last_run")
task.next_run = data.get("next_run")
task.run_count = data.get("run_count", 0)
task.status = data.get("status", "pending")
task.last_result = data.get("last_result")
return task
class TaskManager:
"""Manager for scheduled tasks"""
def __init__(self):
"""Initialize task manager"""
self.tasks = {}
self.scheduler = schedule.Scheduler()
self.scheduler_thread = None
self.running = False
self.task_handlers = {
"notification": self._handle_notification_task,
"data_update": self._handle_data_update_task,
"api_call": self._handle_api_call_task,
"backup": self._handle_backup_task,
"cleanup": self._handle_cleanup_task,
"reminder": self._handle_reminder_task,
"batch_process": self._handle_batch_process_task,
"sync": self._handle_sync_task
}
self.load_tasks()
@handle_exceptions
def load_tasks(self) -> None:
"""Load tasks from storage"""
try:
tasks_data = load_data("scheduled_tasks", default=[])
for task_data in tasks_data:
task = ScheduledTask.from_dict(task_data)
self.tasks[task.id] = task
logger.info(f"Loaded {len(self.tasks)} scheduled tasks")
except Exception as e:
logger.error(f"Failed to load scheduled tasks: {str(e)}")
@handle_exceptions
def save_tasks(self) -> None:
"""Save tasks to storage"""
try:
tasks_data = [task.to_dict() for task in self.tasks.values()]
save_data("scheduled_tasks", tasks_data)
logger.info(f"Saved {len(self.tasks)} scheduled tasks")
except Exception as e:
logger.error(f"Failed to save scheduled tasks: {str(e)}")
@handle_exceptions
def create_task(self, name: str, task_type: str, task_config: Dict[str, Any],
description: Optional[str] = None) -> ScheduledTask:
"""Create a new scheduled task
Args:
name: Task name
task_type: Type of task
task_config: Task configuration
description: Task description (optional)
Returns:
Created task
"""
task = ScheduledTask(name, task_type, task_config, description)
self.tasks[task.id] = task
self.save_tasks()
return task
@handle_exceptions
def get_task(self, task_id: str) -> Optional[ScheduledTask]:
"""Get task by ID
Args:
task_id: Task ID
Returns:
Task if found, None otherwise
"""
return self.tasks.get(task_id)
@handle_exceptions
def update_task(self, task: ScheduledTask) -> None:
"""Update task
Args:
task: Task to update
"""
if task.id in self.tasks:
task.updated_at = datetime.now().isoformat()
self.tasks[task.id] = task
self.save_tasks()
else:
raise AutomationError(f"Task not found: {task.id}")
@handle_exceptions
def delete_task(self, task_id: str) -> None:
"""Delete task
Args:
task_id: Task ID
"""
if task_id in self.tasks:
del self.tasks[task_id]
self.save_tasks()
else:
raise AutomationError(f"Task not found: {task_id}")
@handle_exceptions
def get_all_tasks(self) -> List[ScheduledTask]:
"""Get all tasks
Returns:
List of all tasks
"""
return list(self.tasks.values())
@handle_exceptions
def get_pending_tasks(self) -> List[ScheduledTask]:
"""Get pending tasks
Returns:
List of pending tasks
"""
now = datetime.now().isoformat()
return [
task for task in self.tasks.values()
if task.enabled and task.next_run and task.next_run <= now
]
@handle_exceptions
def start_scheduler(self) -> None:
"""Start the scheduler thread"""
if self.scheduler_thread is not None and self.scheduler_thread.is_alive():
return
self.running = True
self.scheduler_thread = threading.Thread(target=self._scheduler_loop)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
logger.info("Task scheduler started")
@handle_exceptions
def stop_scheduler(self) -> None:
"""Stop the scheduler thread"""
self.running = False
if self.scheduler_thread is not None:
self.scheduler_thread.join(timeout=1.0)
self.scheduler_thread = None
logger.info("Task scheduler stopped")
def _scheduler_loop(self) -> None:
"""Scheduler thread loop"""
while self.running:
try:
# Check for pending tasks
pending_tasks = self.get_pending_tasks()
for task in pending_tasks:
self._execute_task(task)
# Run scheduled jobs
self.scheduler.run_pending()
# Sleep for a short time
time.sleep(1)
except Exception as e:
logger.error(f"Scheduler error: {str(e)}")
def _execute_task(self, task: ScheduledTask) -> None:
"""Execute a scheduled task
Args:
task: Task to execute
"""
try:
# Update task status
task.status = "running"
task.last_run = datetime.now().isoformat()
self.update_task(task)
# Execute task based on type
handler = self.task_handlers.get(task.task_type)
if handler:
result = handler(task)
task.last_result = result
task.status = "completed"
else:
task.last_result = {"error": f"Unknown task type: {task.task_type}"}
task.status = "failed"
# Update task stats
task.run_count += 1
self._calculate_next_run(task)
self.update_task(task)
logger.info(f"Task executed: {task.name}")
except Exception as e:
# Update task on error
task.status = "failed"
task.last_result = {"error": str(e)}
self._calculate_next_run(task)
self.update_task(task)
logger.error(f"Task execution error: {str(e)}")
def _calculate_next_run(self, task: ScheduledTask) -> None:
"""Calculate the next run time for a task
Args:
task: Task to update
"""
if not task.enabled or not task.schedule_type:
task.next_run = None
return
now = datetime.now()
if task.schedule_type == "interval":
interval_minutes = task.schedule_config.get("minutes", 60)
task.next_run = (now + timedelta(minutes=interval_minutes)).isoformat()
elif task.schedule_type == "daily":
time_str = task.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run = next_run + timedelta(days=1)
task.next_run = next_run.isoformat()
elif task.schedule_type == "weekly":
day = task.schedule_config.get("day", "monday").lower()
time_str = task.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
# Map day names to weekday numbers (0 = Monday, 6 = Sunday)
day_map = {
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6
}
target_weekday = day_map.get(day, 0)
# Calculate days until the next occurrence of the target weekday
days_ahead = target_weekday - now.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead)
task.next_run = next_run.isoformat()
elif task.schedule_type == "monthly":
day = task.schedule_config.get("day", 1)
time_str = task.schedule_config.get("time", "09:00")
hour, minute = map(int, time_str.split(":"))
# Calculate the next occurrence of the target day in the current or next month
if day <= now.day:
# Target day already happened this month, move to next month
if now.month == 12:
next_run = datetime(now.year + 1, 1, day, hour, minute)
else:
next_run = datetime(now.year, now.month + 1, day, hour, minute)
else:
# Target day is still to come this month
next_run = datetime(now.year, now.month, day, hour, minute)
task.next_run = next_run.isoformat()
elif task.schedule_type == "once":
# For one-time tasks, if they've run, don't schedule again
if task.run_count > 0:
task.next_run = None
task.enabled = False
def _handle_notification_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle notification task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
notification_type = config.get("notification_type", "app")
title = config.get("title", "")
message = config.get("message", "")
# Placeholder for actual notification sending
if notification_type == "app":
# App notification
logger.info(f"App notification: {title} - {message}")
elif notification_type == "email":
# Email notification
recipient = config.get("recipient", "")
logger.info(f"Email notification to {recipient}: {title} - {message}")
elif notification_type == "telegram":
# Telegram notification
chat_id = config.get("chat_id", "")
logger.info(f"Telegram notification to {chat_id}: {title} - {message}")
return {"status": "success", "message": f"Sent {notification_type} notification"}
def _handle_data_update_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle data update task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
data_type = config.get("data_type", "")
data_id = config.get("data_id", "")
updates = config.get("updates", {})
# Load data
data = load_data(data_type, {})
# Update data
if data_id in data:
for key, value in updates.items():
data[data_id][key] = value
# Save data
save_data(data_type, data)
logger.info(f"Updated {data_type} data: {data_id}")
return {"status": "success", "message": f"Updated {data_type} data: {data_id}"}
else:
return {"status": "error", "message": f"Data not found: {data_type}/{data_id}"}
def _handle_api_call_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle API call task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
url = config.get("url", "")
method = config.get("method", "GET")
headers = config.get("headers", {})
body = config.get("body", {})
# Placeholder for actual API call
logger.info(f"API call: {method} {url}")
return {"status": "success", "message": f"Made API call: {method} {url}"}
def _handle_backup_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle backup task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
backup_type = config.get("backup_type", "all")
destination = config.get("destination", "local")
# Placeholder for actual backup
logger.info(f"Backup: {backup_type} to {destination}")
return {"status": "success", "message": f"Created backup: {backup_type} to {destination}"}
def _handle_cleanup_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle cleanup task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
cleanup_type = config.get("cleanup_type", "")
older_than_days = config.get("older_than_days", 30)
# Placeholder for actual cleanup
logger.info(f"Cleanup: {cleanup_type} older than {older_than_days} days")
return {"status": "success", "message": f"Performed cleanup: {cleanup_type}"}
def _handle_reminder_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle reminder task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
reminder_type = config.get("reminder_type", "task")
item_id = config.get("item_id", "")
message = config.get("message", "")
# Placeholder for actual reminder
logger.info(f"Reminder: {reminder_type} {item_id} - {message}")
return {"status": "success", "message": f"Sent reminder: {reminder_type} {item_id}"}
def _handle_batch_process_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle batch process task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
process_type = config.get("process_type", "")
items = config.get("items", [])
action = config.get("action", "")
# Placeholder for actual batch processing
logger.info(f"Batch process: {action} on {len(items)} {process_type} items")
return {"status": "success", "message": f"Processed {len(items)} {process_type} items"}
def _handle_sync_task(self, task: ScheduledTask) -> Dict[str, Any]:
"""Handle sync task
Args:
task: Task to handle
Returns:
Task result
"""
config = task.task_config
sync_type = config.get("sync_type", "")
source = config.get("source", "")
destination = config.get("destination", "")
# Placeholder for actual sync
logger.info(f"Sync: {sync_type} from {source} to {destination}")
return {"status": "success", "message": f"Synced {sync_type} data"}
# Create a global instance of the task manager
task_manager = TaskManager()