mona / utils /automation /workflow.py
mrradix's picture
Update utils/automation/workflow.py
4083204 verified
import json
import time
import uuid
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime, timedelta
import threading
import schedule
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, AutomationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class Workflow:
"""Smart Workflow for IF-THEN automation"""
def __init__(self, name: str, description: Optional[str] = None):
"""Initialize a workflow
Args:
name: Workflow name
description: Workflow description (optional)
"""
self.id = str(uuid.uuid4())
self.name = name
self.description = description or ""
self.triggers = []
self.actions = []
self.conditions = []
self.enabled = True
self.created_at = datetime.now().isoformat()
self.updated_at = self.created_at
self.last_run = None
self.run_count = 0
@handle_exceptions
def add_trigger(self, trigger_type: str, trigger_config: Dict[str, Any]) -> None:
"""Add a trigger to the workflow
Args:
trigger_type: Type of trigger (e.g., 'time', 'event', 'data_change')
trigger_config: Trigger configuration
"""
self.triggers.append({
"id": str(uuid.uuid4()),
"type": trigger_type,
"config": trigger_config
})
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def add_condition(self, condition_type: str, condition_config: Dict[str, Any]) -> None:
"""Add a condition to the workflow
Args:
condition_type: Type of condition (e.g., 'compare', 'exists', 'contains')
condition_config: Condition configuration
"""
self.conditions.append({
"id": str(uuid.uuid4()),
"type": condition_type,
"config": condition_config
})
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def add_action(self, action_type: str, action_config: Dict[str, Any]) -> None:
"""Add an action to the workflow
Args:
action_type: Type of action (e.g., 'notification', 'data_update', 'api_call')
action_config: Action configuration
"""
self.actions.append({
"id": str(uuid.uuid4()),
"type": action_type,
"config": action_config
})
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def enable(self) -> None:
"""Enable the workflow"""
self.enabled = True
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def disable(self) -> None:
"""Disable the workflow"""
self.enabled = False
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def to_dict(self) -> Dict[str, Any]:
"""Convert workflow to dictionary
Returns:
Workflow as dictionary
"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"triggers": self.triggers,
"conditions": self.conditions,
"actions": self.actions,
"enabled": self.enabled,
"created_at": self.created_at,
"updated_at": self.updated_at,
"last_run": self.last_run,
"run_count": self.run_count
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Workflow':
"""Create workflow from dictionary
Args:
data: Workflow data
Returns:
Workflow instance
"""
workflow = cls(data["name"], data.get("description", ""))
workflow.id = data["id"]
workflow.triggers = data["triggers"]
workflow.conditions = data["conditions"]
workflow.actions = data["actions"]
workflow.enabled = data["enabled"]
workflow.created_at = data["created_at"]
workflow.updated_at = data["updated_at"]
workflow.last_run = data.get("last_run")
workflow.run_count = data.get("run_count", 0)
return workflow
class WorkflowManager:
"""Manager for smart workflows"""
def __init__(self):
"""Initialize workflow manager"""
self.workflows = {}
self.event_handlers = {}
self.data_change_handlers = {}
self.scheduler = schedule.Scheduler()
self.scheduler_thread = None
self.running = False
self.load_workflows()
@handle_exceptions
def load_workflows(self) -> None:
"""Load workflows from storage"""
try:
workflows_data = load_data("workflows", default=[])
for workflow_data in workflows_data:
workflow = Workflow.from_dict(workflow_data)
self.workflows[workflow.id] = workflow
logger.info(f"Loaded {len(self.workflows)} workflows")
except Exception as e:
logger.error(f"Failed to load workflows: {str(e)}")
@handle_exceptions
def save_workflows(self) -> None:
"""Save workflows to storage"""
try:
workflows_data = [workflow.to_dict() for workflow in self.workflows.values()]
save_data("workflows", workflows_data)
logger.info(f"Saved {len(self.workflows)} workflows")
except Exception as e:
logger.error(f"Failed to save workflows: {str(e)}")
@handle_exceptions
def create_workflow(self, name: str, description: Optional[str] = None) -> Workflow:
"""Create a new workflow
Args:
name: Workflow name
description: Workflow description (optional)
Returns:
Created workflow
"""
workflow = Workflow(name, description)
self.workflows[workflow.id] = workflow
self.save_workflows()
return workflow
@handle_exceptions
def get_workflow(self, workflow_id: str) -> Optional[Workflow]:
"""Get workflow by ID
Args:
workflow_id: Workflow ID
Returns:
Workflow if found, None otherwise
"""
return self.workflows.get(workflow_id)
@handle_exceptions
def update_workflow(self, workflow: Workflow) -> None:
"""Update workflow
Args:
workflow: Workflow to update
"""
if workflow.id in self.workflows:
workflow.updated_at = datetime.now().isoformat()
self.workflows[workflow.id] = workflow
self.save_workflows()
else:
raise AutomationError(f"Workflow not found: {workflow.id}")
@handle_exceptions
def delete_workflow(self, workflow_id: str) -> None:
"""Delete workflow
Args:
workflow_id: Workflow ID
"""
if workflow_id in self.workflows:
del self.workflows[workflow_id]
self.save_workflows()
else:
raise AutomationError(f"Workflow not found: {workflow_id}")
@handle_exceptions
def get_all_workflows(self) -> List[Workflow]:
"""Get all workflows
Returns:
List of all workflows
"""
return list(self.workflows.values())
@handle_exceptions
def register_event_handler(self, event_type: str, handler: Callable) -> None:
"""Register event handler
Args:
event_type: Type of event
handler: Event handler function
"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
@handle_exceptions
def register_data_change_handler(self, data_type: str, handler: Callable) -> None:
"""Register data change handler
Args:
data_type: Type of data
handler: Data change handler function
"""
if data_type not in self.data_change_handlers:
self.data_change_handlers[data_type] = []
self.data_change_handlers[data_type].append(handler)
@handle_exceptions
def trigger_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""Trigger an event
Args:
event_type: Type of event
event_data: Event data
"""
# Call event handlers
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
handler(event_data)
except Exception as e:
logger.error(f"Event handler error: {str(e)}")
# Process workflows with event triggers
for workflow in self.workflows.values():
if not workflow.enabled:
continue
for trigger in workflow.triggers:
if trigger["type"] == "event" and trigger["config"].get("event_type") == event_type:
self._process_workflow(workflow, event_data)
@handle_exceptions
def notify_data_change(self, data_type: str, data_id: str, data: Dict[str, Any]) -> None:
"""Notify of data change
Args:
data_type: Type of data
data_id: Data ID
data: Changed data
"""
# Call data change handlers
if data_type in self.data_change_handlers:
for handler in self.data_change_handlers[data_type]:
try:
handler(data_id, data)
except Exception as e:
logger.error(f"Data change handler error: {str(e)}")
# Process workflows with data change triggers
for workflow in self.workflows.values():
if not workflow.enabled:
continue
for trigger in workflow.triggers:
if trigger["type"] == "data_change" and trigger["config"].get("data_type") == data_type:
self._process_workflow(workflow, {"data_type": data_type, "data_id": data_id, "data": data})
def _process_workflow(self, workflow: Workflow, context: Dict[str, Any]) -> None:
"""Process workflow
Args:
workflow: Workflow to process
context: Processing context
"""
try:
# Check conditions
if not self._check_conditions(workflow, context):
return
# Execute actions
for action in workflow.actions:
self._execute_action(action, context)
# Update workflow stats
workflow.last_run = datetime.now().isoformat()
workflow.run_count += 1
self.update_workflow(workflow)
logger.info(f"Workflow executed: {workflow.name}")
except Exception as e:
logger.error(f"Workflow execution error: {str(e)}")
def _check_conditions(self, workflow: Workflow, context: Dict[str, Any]) -> bool:
"""Check workflow conditions
Args:
workflow: Workflow to check
context: Processing context
Returns:
True if all conditions are met, False otherwise
"""
# If no conditions, return True
if not workflow.conditions:
return True
for condition in workflow.conditions:
condition_type = condition["type"]
config = condition["config"]
if condition_type == "compare":
# Compare two values
left_value = self._get_value(config.get("left_value"), config.get("left_type"), context)
right_value = self._get_value(config.get("right_value"), config.get("right_type"), context)
operator = config.get("operator", "==")
if not self._compare_values(left_value, right_value, operator):
return False
elif condition_type == "exists":
# Check if a value exists
path = config.get("path", "")
data_type = config.get("data_type", "context")
if not self._check_exists(path, data_type, context):
return False
elif condition_type == "contains":
# Check if a value contains another value
container = self._get_value(config.get("container"), config.get("container_type"), context)
value = self._get_value(config.get("value"), config.get("value_type"), context)
if not self._check_contains(container, value):
return False
return True
def _get_value(self, value: Any, value_type: str, context: Dict[str, Any]) -> Any:
"""Get value based on type
Args:
value: Value or path
value_type: Type of value ('literal', 'context', 'data')
context: Processing context
Returns:
Resolved value
"""
if value_type == "literal":
return value
elif value_type == "context":
# Get value from context
path_parts = value.split(".")
current = context
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
elif value_type == "data":
# Get value from data storage
data_type, data_id, field = value.split(".", 2)
data = load_data(data_type, {})
if data_id in data:
item = data[data_id]
path_parts = field.split(".")
current = item
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current
return None
def _compare_values(self, left_value: Any, right_value: Any, operator: str) -> bool:
"""Compare two values
Args:
left_value: Left value
right_value: Right value
operator: Comparison operator
Returns:
Comparison result
"""
if operator == "==":
return left_value == right_value
elif operator == "!=":
return left_value != right_value
elif operator == "<":
return left_value < right_value
elif operator == "<=":
return left_value <= right_value
elif operator == ">":
return left_value > right_value
elif operator == ">=":
return left_value >= right_value
elif operator == "contains":
return right_value in left_value
elif operator == "starts_with":
return str(left_value).startswith(str(right_value))
elif operator == "ends_with":
return str(left_value).endswith(str(right_value))
return False
def _check_exists(self, path: str, data_type: str, context: Dict[str, Any]) -> bool:
"""Check if a value exists
Args:
path: Path to value
data_type: Type of data ('context', 'data')
context: Processing context
Returns:
True if value exists, False otherwise
"""
if data_type == "context":
# Check in context
path_parts = path.split(".")
current = context
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return False
return True
elif data_type == "data":
# Check in data storage
data_type, data_id, field = path.split(".", 2)
data = load_data(data_type, {})
if data_id in data:
item = data[data_id]
path_parts = field.split(".")
current = item
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return False
return True
return False
def _check_contains(self, container: Any, value: Any) -> bool:
"""Check if container contains value
Args:
container: Container value
value: Value to check
Returns:
True if container contains value, False otherwise
"""
if container is None or value is None:
return False
if isinstance(container, (list, tuple, set)):
return value in container
elif isinstance(container, dict):
return value in container or value in container.values()
elif isinstance(container, str):
return str(value) in container
return False
def _execute_action(self, action: Dict[str, Any], context: Dict[str, Any]) -> None:
"""Execute workflow action
Args:
action: Action to execute
context: Processing context
"""
action_type = action["type"]
config = action["config"]
if action_type == "notification":
# Send notification
notification_type = config.get("notification_type", "app")
title = self._resolve_template(config.get("title", ""), context)
message = self._resolve_template(config.get("message", ""), context)
if notification_type == "app":
# App notification (placeholder)
logger.info(f"App notification: {title} - {message}")
elif notification_type == "email":
# Email notification (placeholder)
recipient = config.get("recipient", "")
logger.info(f"Email notification to {recipient}: {title} - {message}")
elif notification_type == "telegram":
# Telegram notification (placeholder)
chat_id = config.get("chat_id", "")
logger.info(f"Telegram notification to {chat_id}: {title} - {message}")
elif action_type == "data_update":
# Update data
data_type = config.get("data_type", "")
data_id = self._resolve_template(config.get("data_id", ""), context)
updates = config.get("updates", {})
# Resolve template values in updates
resolved_updates = {}
for key, value in updates.items():
if isinstance(value, str):
resolved_updates[key] = self._resolve_template(value, context)
else:
resolved_updates[key] = value
# Load data
data = load_data(data_type, {})
# Update data
if data_id in data:
for key, value in resolved_updates.items():
data[data_id][key] = value
# Save data
save_data(data_type, data)
logger.info(f"Updated {data_type} data: {data_id}")
elif action_type == "api_call":
# Make API call (placeholder)
url = self._resolve_template(config.get("url", ""), context)
method = config.get("method", "GET")
headers = config.get("headers", {})
body = config.get("body", {})
logger.info(f"API call: {method} {url}")
elif action_type == "function_call":
# Call function (placeholder)
function_name = config.get("function", "")
args = config.get("args", {})
logger.info(f"Function call: {function_name}")
def _resolve_template(self, template: str, context: Dict[str, Any]) -> str:
"""Resolve template with context values
Args:
template: Template string
context: Processing context
Returns:
Resolved template
"""
if not template or not isinstance(template, str):
return template
result = template
# Find all {{variable}} patterns
import re
variables = re.findall(r'\{\{([^}]+)\}\}', template)
for var in variables:
# Get value from context
path_parts = var.strip().split(".")
current = context
try:
for part in path_parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
current = None
break
# Replace in template
if current is not None:
result = result.replace(f"{{{{{var}}}}}", str(current))
except:
pass
return result
@handle_exceptions
def start_scheduler(self) -> None:
"""Start the scheduler thread"""
if self.scheduler_thread is not None and self.scheduler_thread.is_alive():
return
self.running = True
self.scheduler_thread = threading.Thread(target=self._scheduler_loop)
self.scheduler_thread.daemon = True
self.scheduler_thread.start()
logger.info("Workflow scheduler started")
@handle_exceptions
def stop_scheduler(self) -> None:
"""Stop the scheduler thread"""
self.running = False
if self.scheduler_thread is not None:
self.scheduler_thread.join(timeout=1.0)
self.scheduler_thread = None
logger.info("Workflow scheduler stopped")
def _scheduler_loop(self) -> None:
"""Scheduler thread loop"""
while self.running:
try:
self.scheduler.run_pending()
time.sleep(1)
except Exception as e:
logger.error(f"Scheduler error: {str(e)}")
@handle_exceptions
def schedule_workflow(self, workflow_id: str, schedule_type: str, schedule_config: Dict[str, Any]) -> None:
"""Schedule a workflow
Args:
workflow_id: Workflow ID
schedule_type: Type of schedule ('interval', 'daily', 'weekly', 'monthly')
schedule_config: Schedule configuration
"""
workflow = self.get_workflow(workflow_id)
if not workflow:
raise AutomationError(f"Workflow not found: {workflow_id}")
# Create a job function
def job():
self._process_workflow(workflow, {"trigger": "schedule", "schedule_type": schedule_type})
# Schedule based on type
if schedule_type == "interval":
interval_minutes = schedule_config.get("minutes", 60)
self.scheduler.every(interval_minutes).minutes.do(job)
logger.info(f"Scheduled workflow {workflow.name} to run every {interval_minutes} minutes")
elif schedule_type == "daily":
time_str = schedule_config.get("time", "09:00")
self.scheduler.every().day.at(time_str).do(job)
logger.info(f"Scheduled workflow {workflow.name} to run daily at {time_str}")
elif schedule_type == "weekly":
day = schedule_config.get("day", "monday").lower()
time_str = schedule_config.get("time", "09:00")
if day == "monday":
self.scheduler.every().monday.at(time_str).do(job)
elif day == "tuesday":
self.scheduler.every().tuesday.at(time_str).do(job)
elif day == "wednesday":
self.scheduler.every().wednesday.at(time_str).do(job)
elif day == "thursday":
self.scheduler.every().thursday.at(time_str).do(job)
elif day == "friday":
self.scheduler.every().friday.at(time_str).do(job)
elif day == "saturday":
self.scheduler.every().saturday.at(time_str).do(job)
elif day == "sunday":
self.scheduler.every().sunday.at(time_str).do(job)
logger.info(f"Scheduled workflow {workflow.name} to run weekly on {day} at {time_str}")
elif schedule_type == "monthly":
day = schedule_config.get("day", 1)
time_str = schedule_config.get("time", "09:00")
# For monthly, we need to check the day in the job
def monthly_job():
now = datetime.now()
if now.day == day:
job()
# Schedule to run daily at the specified time, but only execute on the specified day
self.scheduler.every().day.at(time_str).do(monthly_job)
logger.info(f"Scheduled workflow {workflow.name} to run monthly on day {day} at {time_str}")
# Start scheduler if not already running
self.start_scheduler()
# Create a global instance of the workflow manager
workflow_manager = WorkflowManager()