|
import json |
|
import time |
|
import uuid |
|
from typing import Dict, List, Any, Optional, Callable, Union |
|
from datetime import datetime, timedelta |
|
import threading |
|
import schedule |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AutomationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class Workflow: |
|
"""Smart Workflow for IF-THEN automation""" |
|
|
|
def __init__(self, name: str, description: Optional[str] = None): |
|
"""Initialize a workflow |
|
|
|
Args: |
|
name: Workflow name |
|
description: Workflow description (optional) |
|
""" |
|
self.id = str(uuid.uuid4()) |
|
self.name = name |
|
self.description = description or "" |
|
self.triggers = [] |
|
self.actions = [] |
|
self.conditions = [] |
|
self.enabled = True |
|
self.created_at = datetime.now().isoformat() |
|
self.updated_at = self.created_at |
|
self.last_run = None |
|
self.run_count = 0 |
|
|
|
@handle_exceptions |
|
def add_trigger(self, trigger_type: str, trigger_config: Dict[str, Any]) -> None: |
|
"""Add a trigger to the workflow |
|
|
|
Args: |
|
trigger_type: Type of trigger (e.g., 'time', 'event', 'data_change') |
|
trigger_config: Trigger configuration |
|
""" |
|
self.triggers.append({ |
|
"id": str(uuid.uuid4()), |
|
"type": trigger_type, |
|
"config": trigger_config |
|
}) |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def add_condition(self, condition_type: str, condition_config: Dict[str, Any]) -> None: |
|
"""Add a condition to the workflow |
|
|
|
Args: |
|
condition_type: Type of condition (e.g., 'compare', 'exists', 'contains') |
|
condition_config: Condition configuration |
|
""" |
|
self.conditions.append({ |
|
"id": str(uuid.uuid4()), |
|
"type": condition_type, |
|
"config": condition_config |
|
}) |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def add_action(self, action_type: str, action_config: Dict[str, Any]) -> None: |
|
"""Add an action to the workflow |
|
|
|
Args: |
|
action_type: Type of action (e.g., 'notification', 'data_update', 'api_call') |
|
action_config: Action configuration |
|
""" |
|
self.actions.append({ |
|
"id": str(uuid.uuid4()), |
|
"type": action_type, |
|
"config": action_config |
|
}) |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def enable(self) -> None: |
|
"""Enable the workflow""" |
|
self.enabled = True |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def disable(self) -> None: |
|
"""Disable the workflow""" |
|
self.enabled = False |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert workflow to dictionary |
|
|
|
Returns: |
|
Workflow as dictionary |
|
""" |
|
return { |
|
"id": self.id, |
|
"name": self.name, |
|
"description": self.description, |
|
"triggers": self.triggers, |
|
"conditions": self.conditions, |
|
"actions": self.actions, |
|
"enabled": self.enabled, |
|
"created_at": self.created_at, |
|
"updated_at": self.updated_at, |
|
"last_run": self.last_run, |
|
"run_count": self.run_count |
|
} |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> 'Workflow': |
|
"""Create workflow from dictionary |
|
|
|
Args: |
|
data: Workflow data |
|
|
|
Returns: |
|
Workflow instance |
|
""" |
|
workflow = cls(data["name"], data.get("description", "")) |
|
workflow.id = data["id"] |
|
workflow.triggers = data["triggers"] |
|
workflow.conditions = data["conditions"] |
|
workflow.actions = data["actions"] |
|
workflow.enabled = data["enabled"] |
|
workflow.created_at = data["created_at"] |
|
workflow.updated_at = data["updated_at"] |
|
workflow.last_run = data.get("last_run") |
|
workflow.run_count = data.get("run_count", 0) |
|
return workflow |
|
|
|
|
|
class WorkflowManager: |
|
"""Manager for smart workflows""" |
|
|
|
def __init__(self): |
|
"""Initialize workflow manager""" |
|
self.workflows = {} |
|
self.event_handlers = {} |
|
self.data_change_handlers = {} |
|
self.scheduler = schedule.Scheduler() |
|
self.scheduler_thread = None |
|
self.running = False |
|
self.load_workflows() |
|
|
|
@handle_exceptions |
|
def load_workflows(self) -> None: |
|
"""Load workflows from storage""" |
|
try: |
|
workflows_data = load_data("workflows", default=[]) |
|
for workflow_data in workflows_data: |
|
workflow = Workflow.from_dict(workflow_data) |
|
self.workflows[workflow.id] = workflow |
|
logger.info(f"Loaded {len(self.workflows)} workflows") |
|
except Exception as e: |
|
logger.error(f"Failed to load workflows: {str(e)}") |
|
|
|
@handle_exceptions |
|
def save_workflows(self) -> None: |
|
"""Save workflows to storage""" |
|
try: |
|
workflows_data = [workflow.to_dict() for workflow in self.workflows.values()] |
|
save_data("workflows", workflows_data) |
|
logger.info(f"Saved {len(self.workflows)} workflows") |
|
except Exception as e: |
|
logger.error(f"Failed to save workflows: {str(e)}") |
|
|
|
@handle_exceptions |
|
def create_workflow(self, name: str, description: Optional[str] = None) -> Workflow: |
|
"""Create a new workflow |
|
|
|
Args: |
|
name: Workflow name |
|
description: Workflow description (optional) |
|
|
|
Returns: |
|
Created workflow |
|
""" |
|
workflow = Workflow(name, description) |
|
self.workflows[workflow.id] = workflow |
|
self.save_workflows() |
|
return workflow |
|
|
|
@handle_exceptions |
|
def get_workflow(self, workflow_id: str) -> Optional[Workflow]: |
|
"""Get workflow by ID |
|
|
|
Args: |
|
workflow_id: Workflow ID |
|
|
|
Returns: |
|
Workflow if found, None otherwise |
|
""" |
|
return self.workflows.get(workflow_id) |
|
|
|
@handle_exceptions |
|
def update_workflow(self, workflow: Workflow) -> None: |
|
"""Update workflow |
|
|
|
Args: |
|
workflow: Workflow to update |
|
""" |
|
if workflow.id in self.workflows: |
|
workflow.updated_at = datetime.now().isoformat() |
|
self.workflows[workflow.id] = workflow |
|
self.save_workflows() |
|
else: |
|
raise AutomationError(f"Workflow not found: {workflow.id}") |
|
|
|
@handle_exceptions |
|
def delete_workflow(self, workflow_id: str) -> None: |
|
"""Delete workflow |
|
|
|
Args: |
|
workflow_id: Workflow ID |
|
""" |
|
if workflow_id in self.workflows: |
|
del self.workflows[workflow_id] |
|
self.save_workflows() |
|
else: |
|
raise AutomationError(f"Workflow not found: {workflow_id}") |
|
|
|
@handle_exceptions |
|
def get_all_workflows(self) -> List[Workflow]: |
|
"""Get all workflows |
|
|
|
Returns: |
|
List of all workflows |
|
""" |
|
return list(self.workflows.values()) |
|
|
|
@handle_exceptions |
|
def register_event_handler(self, event_type: str, handler: Callable) -> None: |
|
"""Register event handler |
|
|
|
Args: |
|
event_type: Type of event |
|
handler: Event handler function |
|
""" |
|
if event_type not in self.event_handlers: |
|
self.event_handlers[event_type] = [] |
|
self.event_handlers[event_type].append(handler) |
|
|
|
@handle_exceptions |
|
def register_data_change_handler(self, data_type: str, handler: Callable) -> None: |
|
"""Register data change handler |
|
|
|
Args: |
|
data_type: Type of data |
|
handler: Data change handler function |
|
""" |
|
if data_type not in self.data_change_handlers: |
|
self.data_change_handlers[data_type] = [] |
|
self.data_change_handlers[data_type].append(handler) |
|
|
|
@handle_exceptions |
|
def trigger_event(self, event_type: str, event_data: Dict[str, Any]) -> None: |
|
"""Trigger an event |
|
|
|
Args: |
|
event_type: Type of event |
|
event_data: Event data |
|
""" |
|
|
|
if event_type in self.event_handlers: |
|
for handler in self.event_handlers[event_type]: |
|
try: |
|
handler(event_data) |
|
except Exception as e: |
|
logger.error(f"Event handler error: {str(e)}") |
|
|
|
|
|
for workflow in self.workflows.values(): |
|
if not workflow.enabled: |
|
continue |
|
|
|
for trigger in workflow.triggers: |
|
if trigger["type"] == "event" and trigger["config"].get("event_type") == event_type: |
|
self._process_workflow(workflow, event_data) |
|
|
|
@handle_exceptions |
|
def notify_data_change(self, data_type: str, data_id: str, data: Dict[str, Any]) -> None: |
|
"""Notify of data change |
|
|
|
Args: |
|
data_type: Type of data |
|
data_id: Data ID |
|
data: Changed data |
|
""" |
|
|
|
if data_type in self.data_change_handlers: |
|
for handler in self.data_change_handlers[data_type]: |
|
try: |
|
handler(data_id, data) |
|
except Exception as e: |
|
logger.error(f"Data change handler error: {str(e)}") |
|
|
|
|
|
for workflow in self.workflows.values(): |
|
if not workflow.enabled: |
|
continue |
|
|
|
for trigger in workflow.triggers: |
|
if trigger["type"] == "data_change" and trigger["config"].get("data_type") == data_type: |
|
self._process_workflow(workflow, {"data_type": data_type, "data_id": data_id, "data": data}) |
|
|
|
def _process_workflow(self, workflow: Workflow, context: Dict[str, Any]) -> None: |
|
"""Process workflow |
|
|
|
Args: |
|
workflow: Workflow to process |
|
context: Processing context |
|
""" |
|
try: |
|
|
|
if not self._check_conditions(workflow, context): |
|
return |
|
|
|
|
|
for action in workflow.actions: |
|
self._execute_action(action, context) |
|
|
|
|
|
workflow.last_run = datetime.now().isoformat() |
|
workflow.run_count += 1 |
|
self.update_workflow(workflow) |
|
|
|
logger.info(f"Workflow executed: {workflow.name}") |
|
except Exception as e: |
|
logger.error(f"Workflow execution error: {str(e)}") |
|
|
|
def _check_conditions(self, workflow: Workflow, context: Dict[str, Any]) -> bool: |
|
"""Check workflow conditions |
|
|
|
Args: |
|
workflow: Workflow to check |
|
context: Processing context |
|
|
|
Returns: |
|
True if all conditions are met, False otherwise |
|
""" |
|
|
|
if not workflow.conditions: |
|
return True |
|
|
|
for condition in workflow.conditions: |
|
condition_type = condition["type"] |
|
config = condition["config"] |
|
|
|
if condition_type == "compare": |
|
|
|
left_value = self._get_value(config.get("left_value"), config.get("left_type"), context) |
|
right_value = self._get_value(config.get("right_value"), config.get("right_type"), context) |
|
operator = config.get("operator", "==") |
|
|
|
if not self._compare_values(left_value, right_value, operator): |
|
return False |
|
|
|
elif condition_type == "exists": |
|
|
|
path = config.get("path", "") |
|
data_type = config.get("data_type", "context") |
|
|
|
if not self._check_exists(path, data_type, context): |
|
return False |
|
|
|
elif condition_type == "contains": |
|
|
|
container = self._get_value(config.get("container"), config.get("container_type"), context) |
|
value = self._get_value(config.get("value"), config.get("value_type"), context) |
|
|
|
if not self._check_contains(container, value): |
|
return False |
|
|
|
return True |
|
|
|
def _get_value(self, value: Any, value_type: str, context: Dict[str, Any]) -> Any: |
|
"""Get value based on type |
|
|
|
Args: |
|
value: Value or path |
|
value_type: Type of value ('literal', 'context', 'data') |
|
context: Processing context |
|
|
|
Returns: |
|
Resolved value |
|
""" |
|
if value_type == "literal": |
|
return value |
|
|
|
elif value_type == "context": |
|
|
|
path_parts = value.split(".") |
|
current = context |
|
|
|
for part in path_parts: |
|
if isinstance(current, dict) and part in current: |
|
current = current[part] |
|
else: |
|
return None |
|
|
|
return current |
|
|
|
elif value_type == "data": |
|
|
|
data_type, data_id, field = value.split(".", 2) |
|
data = load_data(data_type, {}) |
|
|
|
if data_id in data: |
|
item = data[data_id] |
|
path_parts = field.split(".") |
|
current = item |
|
|
|
for part in path_parts: |
|
if isinstance(current, dict) and part in current: |
|
current = current[part] |
|
else: |
|
return None |
|
|
|
return current |
|
|
|
return None |
|
|
|
def _compare_values(self, left_value: Any, right_value: Any, operator: str) -> bool: |
|
"""Compare two values |
|
|
|
Args: |
|
left_value: Left value |
|
right_value: Right value |
|
operator: Comparison operator |
|
|
|
Returns: |
|
Comparison result |
|
""" |
|
if operator == "==": |
|
return left_value == right_value |
|
elif operator == "!=": |
|
return left_value != right_value |
|
elif operator == "<": |
|
return left_value < right_value |
|
elif operator == "<=": |
|
return left_value <= right_value |
|
elif operator == ">": |
|
return left_value > right_value |
|
elif operator == ">=": |
|
return left_value >= right_value |
|
elif operator == "contains": |
|
return right_value in left_value |
|
elif operator == "starts_with": |
|
return str(left_value).startswith(str(right_value)) |
|
elif operator == "ends_with": |
|
return str(left_value).endswith(str(right_value)) |
|
|
|
return False |
|
|
|
def _check_exists(self, path: str, data_type: str, context: Dict[str, Any]) -> bool: |
|
"""Check if a value exists |
|
|
|
Args: |
|
path: Path to value |
|
data_type: Type of data ('context', 'data') |
|
context: Processing context |
|
|
|
Returns: |
|
True if value exists, False otherwise |
|
""" |
|
if data_type == "context": |
|
|
|
path_parts = path.split(".") |
|
current = context |
|
|
|
for part in path_parts: |
|
if isinstance(current, dict) and part in current: |
|
current = current[part] |
|
else: |
|
return False |
|
|
|
return True |
|
|
|
elif data_type == "data": |
|
|
|
data_type, data_id, field = path.split(".", 2) |
|
data = load_data(data_type, {}) |
|
|
|
if data_id in data: |
|
item = data[data_id] |
|
path_parts = field.split(".") |
|
current = item |
|
|
|
for part in path_parts: |
|
if isinstance(current, dict) and part in current: |
|
current = current[part] |
|
else: |
|
return False |
|
|
|
return True |
|
|
|
return False |
|
|
|
def _check_contains(self, container: Any, value: Any) -> bool: |
|
"""Check if container contains value |
|
|
|
Args: |
|
container: Container value |
|
value: Value to check |
|
|
|
Returns: |
|
True if container contains value, False otherwise |
|
""" |
|
if container is None or value is None: |
|
return False |
|
|
|
if isinstance(container, (list, tuple, set)): |
|
return value in container |
|
elif isinstance(container, dict): |
|
return value in container or value in container.values() |
|
elif isinstance(container, str): |
|
return str(value) in container |
|
|
|
return False |
|
|
|
def _execute_action(self, action: Dict[str, Any], context: Dict[str, Any]) -> None: |
|
"""Execute workflow action |
|
|
|
Args: |
|
action: Action to execute |
|
context: Processing context |
|
""" |
|
action_type = action["type"] |
|
config = action["config"] |
|
|
|
if action_type == "notification": |
|
|
|
notification_type = config.get("notification_type", "app") |
|
title = self._resolve_template(config.get("title", ""), context) |
|
message = self._resolve_template(config.get("message", ""), context) |
|
|
|
if notification_type == "app": |
|
|
|
logger.info(f"App notification: {title} - {message}") |
|
|
|
elif notification_type == "email": |
|
|
|
recipient = config.get("recipient", "") |
|
logger.info(f"Email notification to {recipient}: {title} - {message}") |
|
|
|
elif notification_type == "telegram": |
|
|
|
chat_id = config.get("chat_id", "") |
|
logger.info(f"Telegram notification to {chat_id}: {title} - {message}") |
|
|
|
elif action_type == "data_update": |
|
|
|
data_type = config.get("data_type", "") |
|
data_id = self._resolve_template(config.get("data_id", ""), context) |
|
updates = config.get("updates", {}) |
|
|
|
|
|
resolved_updates = {} |
|
for key, value in updates.items(): |
|
if isinstance(value, str): |
|
resolved_updates[key] = self._resolve_template(value, context) |
|
else: |
|
resolved_updates[key] = value |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
if data_id in data: |
|
for key, value in resolved_updates.items(): |
|
data[data_id][key] = value |
|
|
|
|
|
save_data(data_type, data) |
|
logger.info(f"Updated {data_type} data: {data_id}") |
|
|
|
elif action_type == "api_call": |
|
|
|
url = self._resolve_template(config.get("url", ""), context) |
|
method = config.get("method", "GET") |
|
headers = config.get("headers", {}) |
|
body = config.get("body", {}) |
|
|
|
logger.info(f"API call: {method} {url}") |
|
|
|
elif action_type == "function_call": |
|
|
|
function_name = config.get("function", "") |
|
args = config.get("args", {}) |
|
|
|
logger.info(f"Function call: {function_name}") |
|
|
|
def _resolve_template(self, template: str, context: Dict[str, Any]) -> str: |
|
"""Resolve template with context values |
|
|
|
Args: |
|
template: Template string |
|
context: Processing context |
|
|
|
Returns: |
|
Resolved template |
|
""" |
|
if not template or not isinstance(template, str): |
|
return template |
|
|
|
result = template |
|
|
|
|
|
import re |
|
variables = re.findall(r'\{\{([^}]+)\}\}', template) |
|
|
|
for var in variables: |
|
|
|
path_parts = var.strip().split(".") |
|
current = context |
|
|
|
try: |
|
for part in path_parts: |
|
if isinstance(current, dict) and part in current: |
|
current = current[part] |
|
else: |
|
current = None |
|
break |
|
|
|
|
|
if current is not None: |
|
result = result.replace(f"{{{{{var}}}}}", str(current)) |
|
except: |
|
pass |
|
|
|
return result |
|
|
|
@handle_exceptions |
|
def start_scheduler(self) -> None: |
|
"""Start the scheduler thread""" |
|
if self.scheduler_thread is not None and self.scheduler_thread.is_alive(): |
|
return |
|
|
|
self.running = True |
|
self.scheduler_thread = threading.Thread(target=self._scheduler_loop) |
|
self.scheduler_thread.daemon = True |
|
self.scheduler_thread.start() |
|
logger.info("Workflow scheduler started") |
|
|
|
@handle_exceptions |
|
def stop_scheduler(self) -> None: |
|
"""Stop the scheduler thread""" |
|
self.running = False |
|
if self.scheduler_thread is not None: |
|
self.scheduler_thread.join(timeout=1.0) |
|
self.scheduler_thread = None |
|
logger.info("Workflow scheduler stopped") |
|
|
|
def _scheduler_loop(self) -> None: |
|
"""Scheduler thread loop""" |
|
while self.running: |
|
try: |
|
self.scheduler.run_pending() |
|
time.sleep(1) |
|
except Exception as e: |
|
logger.error(f"Scheduler error: {str(e)}") |
|
|
|
@handle_exceptions |
|
def schedule_workflow(self, workflow_id: str, schedule_type: str, schedule_config: Dict[str, Any]) -> None: |
|
"""Schedule a workflow |
|
|
|
Args: |
|
workflow_id: Workflow ID |
|
schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly') |
|
schedule_config: Schedule configuration |
|
""" |
|
workflow = self.get_workflow(workflow_id) |
|
if not workflow: |
|
raise AutomationError(f"Workflow not found: {workflow_id}") |
|
|
|
|
|
def job(): |
|
self._process_workflow(workflow, {"trigger": "schedule", "schedule_type": schedule_type}) |
|
|
|
|
|
if schedule_type == "interval": |
|
interval_minutes = schedule_config.get("minutes", 60) |
|
self.scheduler.every(interval_minutes).minutes.do(job) |
|
logger.info(f"Scheduled workflow {workflow.name} to run every {interval_minutes} minutes") |
|
|
|
elif schedule_type == "daily": |
|
time_str = schedule_config.get("time", "09:00") |
|
self.scheduler.every().day.at(time_str).do(job) |
|
logger.info(f"Scheduled workflow {workflow.name} to run daily at {time_str}") |
|
|
|
elif schedule_type == "weekly": |
|
day = schedule_config.get("day", "monday").lower() |
|
time_str = schedule_config.get("time", "09:00") |
|
|
|
if day == "monday": |
|
self.scheduler.every().monday.at(time_str).do(job) |
|
elif day == "tuesday": |
|
self.scheduler.every().tuesday.at(time_str).do(job) |
|
elif day == "wednesday": |
|
self.scheduler.every().wednesday.at(time_str).do(job) |
|
elif day == "thursday": |
|
self.scheduler.every().thursday.at(time_str).do(job) |
|
elif day == "friday": |
|
self.scheduler.every().friday.at(time_str).do(job) |
|
elif day == "saturday": |
|
self.scheduler.every().saturday.at(time_str).do(job) |
|
elif day == "sunday": |
|
self.scheduler.every().sunday.at(time_str).do(job) |
|
|
|
logger.info(f"Scheduled workflow {workflow.name} to run weekly on {day} at {time_str}") |
|
|
|
elif schedule_type == "monthly": |
|
day = schedule_config.get("day", 1) |
|
time_str = schedule_config.get("time", "09:00") |
|
|
|
|
|
def monthly_job(): |
|
now = datetime.now() |
|
if now.day == day: |
|
job() |
|
|
|
|
|
self.scheduler.every().day.at(time_str).do(monthly_job) |
|
logger.info(f"Scheduled workflow {workflow.name} to run monthly on day {day} at {time_str}") |
|
|
|
|
|
self.start_scheduler() |
|
|
|
|
|
|
|
workflow_manager = WorkflowManager() |