|
import json |
|
import time |
|
import uuid |
|
from typing import Dict, List, Any, Optional, Callable, Union |
|
from datetime import datetime, timedelta |
|
import threading |
|
import schedule |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AutomationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class ScheduledTask: |
|
"""Scheduled task for time-based triggers""" |
|
|
|
def __init__(self, name: str, task_type: str, task_config: Dict[str, Any], |
|
description: Optional[str] = None): |
|
"""Initialize a scheduled task |
|
|
|
Args: |
|
name: Task name |
|
task_type: Type of task (e.g., 'notification', 'data_update', 'api_call') |
|
task_config: Task configuration |
|
description: Task description (optional) |
|
""" |
|
self.id = str(uuid.uuid4()) |
|
self.name = name |
|
self.description = description or "" |
|
self.task_type = task_type |
|
self.task_config = task_config |
|
self.schedule_type = None |
|
self.schedule_config = {} |
|
self.enabled = True |
|
self.created_at = datetime.now().isoformat() |
|
self.updated_at = self.created_at |
|
self.last_run = None |
|
self.next_run = None |
|
self.run_count = 0 |
|
self.status = "pending" |
|
self.last_result = None |
|
|
|
@handle_exceptions |
|
def set_schedule(self, schedule_type: str, schedule_config: Dict[str, Any]) -> None: |
|
"""Set task schedule |
|
|
|
Args: |
|
schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly', 'once') |
|
schedule_config: Schedule configuration |
|
""" |
|
self.schedule_type = schedule_type |
|
self.schedule_config = schedule_config |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
|
|
self._calculate_next_run() |
|
|
|
def _calculate_next_run(self) -> None: |
|
"""Calculate the next run time based on schedule""" |
|
now = datetime.now() |
|
|
|
if self.schedule_type == "interval": |
|
interval_minutes = self.schedule_config.get("minutes", 60) |
|
self.next_run = (now + timedelta(minutes=interval_minutes)).isoformat() |
|
|
|
elif self.schedule_type == "daily": |
|
time_str = self.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) |
|
|
|
if next_run <= now: |
|
next_run = next_run + timedelta(days=1) |
|
|
|
self.next_run = next_run.isoformat() |
|
|
|
elif self.schedule_type == "weekly": |
|
day = self.schedule_config.get("day", "monday").lower() |
|
time_str = self.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
|
|
|
|
day_map = { |
|
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, |
|
"friday": 4, "saturday": 5, "sunday": 6 |
|
} |
|
target_weekday = day_map.get(day, 0) |
|
|
|
|
|
days_ahead = target_weekday - now.weekday() |
|
if days_ahead <= 0: |
|
days_ahead += 7 |
|
|
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead) |
|
self.next_run = next_run.isoformat() |
|
|
|
elif self.schedule_type == "monthly": |
|
day = self.schedule_config.get("day", 1) |
|
time_str = self.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
|
|
|
|
if day <= now.day: |
|
|
|
if now.month == 12: |
|
next_run = datetime(now.year + 1, 1, day, hour, minute) |
|
else: |
|
next_run = datetime(now.year, now.month + 1, day, hour, minute) |
|
else: |
|
|
|
next_run = datetime(now.year, now.month, day, hour, minute) |
|
|
|
self.next_run = next_run.isoformat() |
|
|
|
elif self.schedule_type == "once": |
|
date_str = self.schedule_config.get("date", "") |
|
time_str = self.schedule_config.get("time", "09:00") |
|
|
|
if date_str: |
|
try: |
|
date_obj = datetime.strptime(date_str, "%Y-%m-%d") |
|
hour, minute = map(int, time_str.split(":")) |
|
next_run = date_obj.replace(hour=hour, minute=minute) |
|
|
|
if next_run > now: |
|
self.next_run = next_run.isoformat() |
|
else: |
|
self.next_run = None |
|
except: |
|
self.next_run = None |
|
else: |
|
self.next_run = None |
|
|
|
@handle_exceptions |
|
def enable(self) -> None: |
|
"""Enable the task""" |
|
self.enabled = True |
|
self.updated_at = datetime.now().isoformat() |
|
self._calculate_next_run() |
|
|
|
@handle_exceptions |
|
def disable(self) -> None: |
|
"""Disable the task""" |
|
self.enabled = False |
|
self.updated_at = datetime.now().isoformat() |
|
self.next_run = None |
|
|
|
@handle_exceptions |
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert task to dictionary |
|
|
|
Returns: |
|
Task as dictionary |
|
""" |
|
return { |
|
"id": self.id, |
|
"name": self.name, |
|
"description": self.description, |
|
"task_type": self.task_type, |
|
"task_config": self.task_config, |
|
"schedule_type": self.schedule_type, |
|
"schedule_config": self.schedule_config, |
|
"enabled": self.enabled, |
|
"created_at": self.created_at, |
|
"updated_at": self.updated_at, |
|
"last_run": self.last_run, |
|
"next_run": self.next_run, |
|
"run_count": self.run_count, |
|
"status": self.status, |
|
"last_result": self.last_result |
|
} |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> 'ScheduledTask': |
|
"""Create task from dictionary |
|
|
|
Args: |
|
data: Task data |
|
|
|
Returns: |
|
ScheduledTask instance |
|
""" |
|
task = cls(data["name"], data["task_type"], data["task_config"], data.get("description", "")) |
|
task.id = data["id"] |
|
task.schedule_type = data.get("schedule_type") |
|
task.schedule_config = data.get("schedule_config", {}) |
|
task.enabled = data["enabled"] |
|
task.created_at = data["created_at"] |
|
task.updated_at = data["updated_at"] |
|
task.last_run = data.get("last_run") |
|
task.next_run = data.get("next_run") |
|
task.run_count = data.get("run_count", 0) |
|
task.status = data.get("status", "pending") |
|
task.last_result = data.get("last_result") |
|
return task |
|
|
|
|
|
class TaskManager: |
|
"""Manager for scheduled tasks""" |
|
|
|
def __init__(self): |
|
"""Initialize task manager""" |
|
self.tasks = {} |
|
self.scheduler = schedule.Scheduler() |
|
self.scheduler_thread = None |
|
self.running = False |
|
self.task_handlers = { |
|
"notification": self._handle_notification_task, |
|
"data_update": self._handle_data_update_task, |
|
"api_call": self._handle_api_call_task, |
|
"backup": self._handle_backup_task, |
|
"cleanup": self._handle_cleanup_task, |
|
"reminder": self._handle_reminder_task, |
|
"batch_process": self._handle_batch_process_task, |
|
"sync": self._handle_sync_task |
|
} |
|
self.load_tasks() |
|
|
|
@handle_exceptions |
|
def load_tasks(self) -> None: |
|
"""Load tasks from storage""" |
|
try: |
|
tasks_data = load_data("scheduled_tasks", default=[]) |
|
for task_data in tasks_data: |
|
task = ScheduledTask.from_dict(task_data) |
|
self.tasks[task.id] = task |
|
logger.info(f"Loaded {len(self.tasks)} scheduled tasks") |
|
except Exception as e: |
|
logger.error(f"Failed to load scheduled tasks: {str(e)}") |
|
|
|
@handle_exceptions |
|
def save_tasks(self) -> None: |
|
"""Save tasks to storage""" |
|
try: |
|
tasks_data = [task.to_dict() for task in self.tasks.values()] |
|
save_data("scheduled_tasks", tasks_data) |
|
logger.info(f"Saved {len(self.tasks)} scheduled tasks") |
|
except Exception as e: |
|
logger.error(f"Failed to save scheduled tasks: {str(e)}") |
|
|
|
@handle_exceptions |
|
def create_task(self, name: str, task_type: str, task_config: Dict[str, Any], |
|
description: Optional[str] = None) -> ScheduledTask: |
|
"""Create a new scheduled task |
|
|
|
Args: |
|
name: Task name |
|
task_type: Type of task |
|
task_config: Task configuration |
|
description: Task description (optional) |
|
|
|
Returns: |
|
Created task |
|
""" |
|
task = ScheduledTask(name, task_type, task_config, description) |
|
self.tasks[task.id] = task |
|
self.save_tasks() |
|
return task |
|
|
|
@handle_exceptions |
|
def get_task(self, task_id: str) -> Optional[ScheduledTask]: |
|
"""Get task by ID |
|
|
|
Args: |
|
task_id: Task ID |
|
|
|
Returns: |
|
Task if found, None otherwise |
|
""" |
|
return self.tasks.get(task_id) |
|
|
|
@handle_exceptions |
|
def update_task(self, task: ScheduledTask) -> None: |
|
"""Update task |
|
|
|
Args: |
|
task: Task to update |
|
""" |
|
if task.id in self.tasks: |
|
task.updated_at = datetime.now().isoformat() |
|
self.tasks[task.id] = task |
|
self.save_tasks() |
|
else: |
|
raise AutomationError(f"Task not found: {task.id}") |
|
|
|
@handle_exceptions |
|
def delete_task(self, task_id: str) -> None: |
|
"""Delete task |
|
|
|
Args: |
|
task_id: Task ID |
|
""" |
|
if task_id in self.tasks: |
|
del self.tasks[task_id] |
|
self.save_tasks() |
|
else: |
|
raise AutomationError(f"Task not found: {task_id}") |
|
|
|
@handle_exceptions |
|
def get_all_tasks(self) -> List[ScheduledTask]: |
|
"""Get all tasks |
|
|
|
Returns: |
|
List of all tasks |
|
""" |
|
return list(self.tasks.values()) |
|
|
|
@handle_exceptions |
|
def get_pending_tasks(self) -> List[ScheduledTask]: |
|
"""Get pending tasks |
|
|
|
Returns: |
|
List of pending tasks |
|
""" |
|
now = datetime.now().isoformat() |
|
return [ |
|
task for task in self.tasks.values() |
|
if task.enabled and task.next_run and task.next_run <= now |
|
] |
|
|
|
@handle_exceptions |
|
def start_scheduler(self) -> None: |
|
"""Start the scheduler thread""" |
|
if self.scheduler_thread is not None and self.scheduler_thread.is_alive(): |
|
return |
|
|
|
self.running = True |
|
self.scheduler_thread = threading.Thread(target=self._scheduler_loop) |
|
self.scheduler_thread.daemon = True |
|
self.scheduler_thread.start() |
|
logger.info("Task scheduler started") |
|
|
|
@handle_exceptions |
|
def stop_scheduler(self) -> None: |
|
"""Stop the scheduler thread""" |
|
self.running = False |
|
if self.scheduler_thread is not None: |
|
self.scheduler_thread.join(timeout=1.0) |
|
self.scheduler_thread = None |
|
logger.info("Task scheduler stopped") |
|
|
|
def _scheduler_loop(self) -> None: |
|
"""Scheduler thread loop""" |
|
while self.running: |
|
try: |
|
|
|
pending_tasks = self.get_pending_tasks() |
|
for task in pending_tasks: |
|
self._execute_task(task) |
|
|
|
|
|
self.scheduler.run_pending() |
|
|
|
|
|
time.sleep(1) |
|
except Exception as e: |
|
logger.error(f"Scheduler error: {str(e)}") |
|
|
|
def _execute_task(self, task: ScheduledTask) -> None: |
|
"""Execute a scheduled task |
|
|
|
Args: |
|
task: Task to execute |
|
""" |
|
try: |
|
|
|
task.status = "running" |
|
task.last_run = datetime.now().isoformat() |
|
self.update_task(task) |
|
|
|
|
|
handler = self.task_handlers.get(task.task_type) |
|
if handler: |
|
result = handler(task) |
|
task.last_result = result |
|
task.status = "completed" |
|
else: |
|
task.last_result = {"error": f"Unknown task type: {task.task_type}"} |
|
task.status = "failed" |
|
|
|
|
|
task.run_count += 1 |
|
self._calculate_next_run(task) |
|
self.update_task(task) |
|
|
|
logger.info(f"Task executed: {task.name}") |
|
except Exception as e: |
|
|
|
task.status = "failed" |
|
task.last_result = {"error": str(e)} |
|
self._calculate_next_run(task) |
|
self.update_task(task) |
|
logger.error(f"Task execution error: {str(e)}") |
|
|
|
def _calculate_next_run(self, task: ScheduledTask) -> None: |
|
"""Calculate the next run time for a task |
|
|
|
Args: |
|
task: Task to update |
|
""" |
|
if not task.enabled or not task.schedule_type: |
|
task.next_run = None |
|
return |
|
|
|
now = datetime.now() |
|
|
|
if task.schedule_type == "interval": |
|
interval_minutes = task.schedule_config.get("minutes", 60) |
|
task.next_run = (now + timedelta(minutes=interval_minutes)).isoformat() |
|
|
|
elif task.schedule_type == "daily": |
|
time_str = task.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) |
|
|
|
if next_run <= now: |
|
next_run = next_run + timedelta(days=1) |
|
|
|
task.next_run = next_run.isoformat() |
|
|
|
elif task.schedule_type == "weekly": |
|
day = task.schedule_config.get("day", "monday").lower() |
|
time_str = task.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
|
|
|
|
day_map = { |
|
"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, |
|
"friday": 4, "saturday": 5, "sunday": 6 |
|
} |
|
target_weekday = day_map.get(day, 0) |
|
|
|
|
|
days_ahead = target_weekday - now.weekday() |
|
if days_ahead <= 0: |
|
days_ahead += 7 |
|
|
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + timedelta(days=days_ahead) |
|
task.next_run = next_run.isoformat() |
|
|
|
elif task.schedule_type == "monthly": |
|
day = task.schedule_config.get("day", 1) |
|
time_str = task.schedule_config.get("time", "09:00") |
|
hour, minute = map(int, time_str.split(":")) |
|
|
|
|
|
if day <= now.day: |
|
|
|
if now.month == 12: |
|
next_run = datetime(now.year + 1, 1, day, hour, minute) |
|
else: |
|
next_run = datetime(now.year, now.month + 1, day, hour, minute) |
|
else: |
|
|
|
next_run = datetime(now.year, now.month, day, hour, minute) |
|
|
|
task.next_run = next_run.isoformat() |
|
|
|
elif task.schedule_type == "once": |
|
|
|
if task.run_count > 0: |
|
task.next_run = None |
|
task.enabled = False |
|
|
|
def _handle_notification_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle notification task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
notification_type = config.get("notification_type", "app") |
|
title = config.get("title", "") |
|
message = config.get("message", "") |
|
|
|
|
|
if notification_type == "app": |
|
|
|
logger.info(f"App notification: {title} - {message}") |
|
|
|
elif notification_type == "email": |
|
|
|
recipient = config.get("recipient", "") |
|
logger.info(f"Email notification to {recipient}: {title} - {message}") |
|
|
|
elif notification_type == "telegram": |
|
|
|
chat_id = config.get("chat_id", "") |
|
logger.info(f"Telegram notification to {chat_id}: {title} - {message}") |
|
|
|
return {"status": "success", "message": f"Sent {notification_type} notification"} |
|
|
|
def _handle_data_update_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle data update task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
data_type = config.get("data_type", "") |
|
data_id = config.get("data_id", "") |
|
updates = config.get("updates", {}) |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
if data_id in data: |
|
for key, value in updates.items(): |
|
data[data_id][key] = value |
|
|
|
|
|
save_data(data_type, data) |
|
logger.info(f"Updated {data_type} data: {data_id}") |
|
return {"status": "success", "message": f"Updated {data_type} data: {data_id}"} |
|
else: |
|
return {"status": "error", "message": f"Data not found: {data_type}/{data_id}"} |
|
|
|
def _handle_api_call_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle API call task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
url = config.get("url", "") |
|
method = config.get("method", "GET") |
|
headers = config.get("headers", {}) |
|
body = config.get("body", {}) |
|
|
|
|
|
logger.info(f"API call: {method} {url}") |
|
|
|
return {"status": "success", "message": f"Made API call: {method} {url}"} |
|
|
|
def _handle_backup_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle backup task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
backup_type = config.get("backup_type", "all") |
|
destination = config.get("destination", "local") |
|
|
|
|
|
logger.info(f"Backup: {backup_type} to {destination}") |
|
|
|
return {"status": "success", "message": f"Created backup: {backup_type} to {destination}"} |
|
|
|
def _handle_cleanup_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle cleanup task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
cleanup_type = config.get("cleanup_type", "") |
|
older_than_days = config.get("older_than_days", 30) |
|
|
|
|
|
logger.info(f"Cleanup: {cleanup_type} older than {older_than_days} days") |
|
|
|
return {"status": "success", "message": f"Performed cleanup: {cleanup_type}"} |
|
|
|
def _handle_reminder_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle reminder task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
reminder_type = config.get("reminder_type", "task") |
|
item_id = config.get("item_id", "") |
|
message = config.get("message", "") |
|
|
|
|
|
logger.info(f"Reminder: {reminder_type} {item_id} - {message}") |
|
|
|
return {"status": "success", "message": f"Sent reminder: {reminder_type} {item_id}"} |
|
|
|
def _handle_batch_process_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle batch process task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
process_type = config.get("process_type", "") |
|
items = config.get("items", []) |
|
action = config.get("action", "") |
|
|
|
|
|
logger.info(f"Batch process: {action} on {len(items)} {process_type} items") |
|
|
|
return {"status": "success", "message": f"Processed {len(items)} {process_type} items"} |
|
|
|
def _handle_sync_task(self, task: ScheduledTask) -> Dict[str, Any]: |
|
"""Handle sync task |
|
|
|
Args: |
|
task: Task to handle |
|
|
|
Returns: |
|
Task result |
|
""" |
|
config = task.task_config |
|
sync_type = config.get("sync_type", "") |
|
source = config.get("source", "") |
|
destination = config.get("destination", "") |
|
|
|
|
|
logger.info(f"Sync: {sync_type} from {source} to {destination}") |
|
|
|
return {"status": "success", "message": f"Synced {sync_type} data"} |
|
|
|
|
|
|
|
task_manager = TaskManager() |