mona / utils /automation /batch_processing.py
mrradix's picture
Upload 48 files
8e4018d verified
import json
import uuid
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime
import threading
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, AutomationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class BatchOperation:
"""Batch operation for mass processing"""
def __init__(self, name: str, operation_type: str,
target_items: List[Dict[str, Any]], operation_config: Dict[str, Any],
description: Optional[str] = None):
"""Initialize a batch operation
Args:
name: Operation name
operation_type: Type of operation (update, delete, tag, categorize, etc.)
target_items: List of items to process
operation_config: Operation configuration
description: Operation description (optional)
"""
self.id = str(uuid.uuid4())
self.name = name
self.description = description or ""
self.operation_type = operation_type
self.target_items = target_items
self.operation_config = operation_config
self.created_at = datetime.now().isoformat()
self.updated_at = self.created_at
self.started_at = None
self.completed_at = None
self.status = "pending" # pending, running, completed, failed, cancelled
self.progress = 0 # 0-100
self.results = {
"total": len(target_items),
"processed": 0,
"succeeded": 0,
"failed": 0,
"skipped": 0,
"errors": []
}
self.dry_run = False # If True, simulate but don't actually perform changes
@handle_exceptions
def start(self) -> None:
"""Start the batch operation"""
if self.status != "pending":
raise AutomationError(f"Cannot start operation in {self.status} state")
self.status = "running"
self.started_at = datetime.now().isoformat()
self.updated_at = self.started_at
@handle_exceptions
def complete(self) -> None:
"""Mark the batch operation as completed"""
if self.status != "running":
raise AutomationError(f"Cannot complete operation in {self.status} state")
self.status = "completed"
self.completed_at = datetime.now().isoformat()
self.updated_at = self.completed_at
self.progress = 100
@handle_exceptions
def fail(self, error: str) -> None:
"""Mark the batch operation as failed
Args:
error: Error message
"""
self.status = "failed"
self.updated_at = datetime.now().isoformat()
self.results["errors"].append(error)
@handle_exceptions
def cancel(self) -> None:
"""Cancel the batch operation"""
if self.status not in ["pending", "running"]:
raise AutomationError(f"Cannot cancel operation in {self.status} state")
self.status = "cancelled"
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def update_progress(self, processed: int, succeeded: int, failed: int, skipped: int) -> None:
"""Update operation progress
Args:
processed: Number of items processed
succeeded: Number of items processed successfully
failed: Number of items that failed
skipped: Number of items skipped
"""
self.results["processed"] = processed
self.results["succeeded"] = succeeded
self.results["failed"] = failed
self.results["skipped"] = skipped
total = self.results["total"]
if total > 0:
self.progress = min(100, int((processed / total) * 100))
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def add_error(self, error: str) -> None:
"""Add an error message
Args:
error: Error message
"""
self.results["errors"].append(error)
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def set_dry_run(self, dry_run: bool) -> None:
"""Set dry run mode
Args:
dry_run: Whether to run in dry run mode
"""
self.dry_run = dry_run
self.updated_at = datetime.now().isoformat()
@handle_exceptions
def to_dict(self) -> Dict[str, Any]:
"""Convert operation to dictionary
Returns:
Operation as dictionary
"""
return {
"id": self.id,
"name": self.name,
"description": self.description,
"operation_type": self.operation_type,
"target_items": self.target_items,
"operation_config": self.operation_config,
"created_at": self.created_at,
"updated_at": self.updated_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
"status": self.status,
"progress": self.progress,
"results": self.results,
"dry_run": self.dry_run
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'BatchOperation':
"""Create operation from dictionary
Args:
data: Operation data
Returns:
BatchOperation instance
"""
operation = cls(
data["name"],
data["operation_type"],
data["target_items"],
data["operation_config"],
data.get("description", "")
)
operation.id = data["id"]
operation.created_at = data["created_at"]
operation.updated_at = data["updated_at"]
operation.started_at = data.get("started_at")
operation.completed_at = data.get("completed_at")
operation.status = data["status"]
operation.progress = data["progress"]
operation.results = data["results"]
operation.dry_run = data.get("dry_run", False)
return operation
class BatchProcessor:
"""Processor for batch operations"""
def __init__(self):
"""Initialize batch processor"""
self.operations = {}
self.running_operations = {}
self.operation_handlers = {
"update": self._handle_update_operation,
"delete": self._handle_delete_operation,
"tag": self._handle_tag_operation,
"categorize": self._handle_categorize_operation,
"status_change": self._handle_status_change_operation,
"priority_change": self._handle_priority_change_operation,
"due_date_change": self._handle_due_date_change_operation,
"assign": self._handle_assign_operation,
"export": self._handle_export_operation,
"import": self._handle_import_operation,
"custom": self._handle_custom_operation
}
self.load_operations()
@handle_exceptions
def load_operations(self) -> None:
"""Load operations from storage"""
try:
operations_data = load_data("batch_operations", default=[])
for operation_data in operations_data:
operation = BatchOperation.from_dict(operation_data)
self.operations[operation.id] = operation
logger.info(f"Loaded {len(self.operations)} batch operations")
except Exception as e:
logger.error(f"Failed to load batch operations: {str(e)}")
@handle_exceptions
def save_operations(self) -> None:
"""Save operations to storage"""
try:
operations_data = [operation.to_dict() for operation in self.operations.values()]
save_data("batch_operations", operations_data)
logger.info(f"Saved {len(self.operations)} batch operations")
except Exception as e:
logger.error(f"Failed to save batch operations: {str(e)}")
@handle_exceptions
def create_operation(self, name: str, operation_type: str,
target_items: List[Dict[str, Any]], operation_config: Dict[str, Any],
description: Optional[str] = None, dry_run: bool = False) -> BatchOperation:
"""Create a new batch operation
Args:
name: Operation name
operation_type: Type of operation
target_items: List of items to process
operation_config: Operation configuration
description: Operation description (optional)
dry_run: Whether to run in dry run mode
Returns:
Created operation
"""
operation = BatchOperation(name, operation_type, target_items, operation_config, description)
operation.set_dry_run(dry_run)
self.operations[operation.id] = operation
self.save_operations()
return operation
@handle_exceptions
def get_operation(self, operation_id: str) -> Optional[BatchOperation]:
"""Get operation by ID
Args:
operation_id: Operation ID
Returns:
Operation if found, None otherwise
"""
return self.operations.get(operation_id)
@handle_exceptions
def update_operation(self, operation: BatchOperation) -> None:
"""Update operation
Args:
operation: Operation to update
"""
if operation.id in self.operations:
operation.updated_at = datetime.now().isoformat()
self.operations[operation.id] = operation
self.save_operations()
else:
raise AutomationError(f"Operation not found: {operation.id}")
@handle_exceptions
def delete_operation(self, operation_id: str) -> None:
"""Delete operation
Args:
operation_id: Operation ID
"""
if operation_id in self.operations:
del self.operations[operation_id]
self.save_operations()
else:
raise AutomationError(f"Operation not found: {operation_id}")
@handle_exceptions
def get_all_operations(self) -> List[BatchOperation]:
"""Get all operations
Returns:
List of all operations
"""
return list(self.operations.values())
@handle_exceptions
def execute_operation(self, operation_id: str) -> None:
"""Execute a batch operation
Args:
operation_id: Operation ID
"""
operation = self.get_operation(operation_id)
if not operation:
raise AutomationError(f"Operation not found: {operation_id}")
if operation.status != "pending":
raise AutomationError(f"Cannot execute operation in {operation.status} state")
# Start the operation
operation.start()
self.update_operation(operation)
# Start a thread to execute the operation
thread = threading.Thread(target=self._execute_operation_thread, args=(operation_id,))
thread.daemon = True
thread.start()
# Store the thread
self.running_operations[operation_id] = thread
def _execute_operation_thread(self, operation_id: str) -> None:
"""Execute a batch operation in a separate thread
Args:
operation_id: Operation ID
"""
operation = self.get_operation(operation_id)
if not operation:
return
try:
# Get the handler for this operation type
handler = self.operation_handlers.get(operation.operation_type)
if not handler:
raise AutomationError(f"Unknown operation type: {operation.operation_type}")
# Execute the operation
handler(operation)
# Mark as completed
operation.complete()
self.update_operation(operation)
except Exception as e:
# Mark as failed
operation.fail(str(e))
self.update_operation(operation)
logger.error(f"Operation execution error: {str(e)}")
# Remove from running operations
if operation_id in self.running_operations:
del self.running_operations[operation_id]
@handle_exceptions
def cancel_operation(self, operation_id: str) -> None:
"""Cancel a running operation
Args:
operation_id: Operation ID
"""
operation = self.get_operation(operation_id)
if not operation:
raise AutomationError(f"Operation not found: {operation_id}")
# Mark as cancelled
operation.cancel()
self.update_operation(operation)
# Thread will notice the cancelled status and exit
def _handle_update_operation(self, operation: BatchOperation) -> None:
"""Handle update operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
updates = config.get("updates", {})
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Apply updates
if not operation.dry_run:
for key, value in updates.items():
data[item_id][key] = value
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to update item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_delete_operation(self, operation: BatchOperation) -> None:
"""Handle delete operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Delete item
if not operation.dry_run:
del data[item_id]
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to delete item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_tag_operation(self, operation: BatchOperation) -> None:
"""Handle tag operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
action = config.get("action", "add") # add, remove, replace
tags = config.get("tags", [])
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Get current tags
current_tags = data[item_id].get("tags", [])
# Apply tag operation
if action == "add":
# Add tags that don't already exist
new_tags = current_tags.copy()
for tag in tags:
if tag not in new_tags:
new_tags.append(tag)
elif action == "remove":
# Remove specified tags
new_tags = [tag for tag in current_tags if tag not in tags]
elif action == "replace":
# Replace all tags
new_tags = tags.copy()
else:
raise AutomationError(f"Unknown tag action: {action}")
# Update tags
if not operation.dry_run:
data[item_id]["tags"] = new_tags
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to update tags for item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_categorize_operation(self, operation: BatchOperation) -> None:
"""Handle categorize operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
category = config.get("category", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Update category
if not operation.dry_run:
data[item_id]["category"] = category
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to categorize item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_status_change_operation(self, operation: BatchOperation) -> None:
"""Handle status change operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
status = config.get("status", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Update status
if not operation.dry_run:
data[item_id]["status"] = status
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to change status for item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_priority_change_operation(self, operation: BatchOperation) -> None:
"""Handle priority change operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
priority = config.get("priority", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Update priority
if not operation.dry_run:
data[item_id]["priority"] = priority
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to change priority for item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_due_date_change_operation(self, operation: BatchOperation) -> None:
"""Handle due date change operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
due_date = config.get("due_date", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Update due date
if not operation.dry_run:
data[item_id]["due_date"] = due_date
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to change due date for item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_assign_operation(self, operation: BatchOperation) -> None:
"""Handle assign operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
assignee = config.get("assignee", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Update assignee
if not operation.dry_run:
data[item_id]["assignee"] = assignee
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to assign item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_export_operation(self, operation: BatchOperation) -> None:
"""Handle export operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
export_format = config.get("format", "json")
file_path = config.get("file_path", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
# Filter data to only include target items
export_data = {}
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Add to export data
export_data[item_id] = data[item_id]
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to export item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Export data
if not operation.dry_run and file_path:
try:
if export_format == "json":
with open(file_path, "w") as f:
json.dump(export_data, f, indent=2)
elif export_format == "csv":
# Placeholder for CSV export
pass
elif export_format == "markdown":
# Placeholder for Markdown export
pass
else:
operation.add_error(f"Unknown export format: {export_format}")
except Exception as e:
operation.add_error(f"Failed to export data: {str(e)}")
def _handle_import_operation(self, operation: BatchOperation) -> None:
"""Handle import operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
import_format = config.get("format", "json")
file_path = config.get("file_path", "")
merge_strategy = config.get("merge_strategy", "overwrite") # overwrite, merge, skip
# Load current data
current_data = load_data(data_type, {})
# Load import data
import_data = {}
try:
if import_format == "json" and file_path:
with open(file_path, "r") as f:
import_data = json.load(f)
elif import_format == "csv" and file_path:
# Placeholder for CSV import
pass
elif import_format == "markdown" and file_path:
# Placeholder for Markdown import
pass
else:
operation.add_error(f"Unknown import format: {import_format}")
return
except Exception as e:
operation.add_error(f"Failed to load import data: {str(e)}")
return
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item_id, item_data in import_data.items():
processed += 1
try:
if item_id in current_data:
if merge_strategy == "overwrite":
# Overwrite existing item
if not operation.dry_run:
current_data[item_id] = item_data
succeeded += 1
elif merge_strategy == "merge":
# Merge with existing item
if not operation.dry_run:
for key, value in item_data.items():
current_data[item_id][key] = value
succeeded += 1
elif merge_strategy == "skip":
# Skip existing item
skipped += 1
else:
operation.add_error(f"Unknown merge strategy: {merge_strategy}")
failed += 1
else:
# Add new item
if not operation.dry_run:
current_data[item_id] = item_data
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to import item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, current_data)
def _handle_custom_operation(self, operation: BatchOperation) -> None:
"""Handle custom operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
custom_type = config.get("custom_type", "")
# Process items based on custom type
if custom_type == "archive":
self._handle_archive_operation(operation)
elif custom_type == "duplicate":
self._handle_duplicate_operation(operation)
elif custom_type == "merge":
self._handle_merge_operation(operation)
else:
operation.add_error(f"Unknown custom operation type: {custom_type}")
def _handle_archive_operation(self, operation: BatchOperation) -> None:
"""Handle archive operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
# Load data
data = load_data(data_type, {})
archives = load_data(f"{data_type}_archives", {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Move to archives
if not operation.dry_run:
archives[item_id] = data[item_id]
archives[item_id]["archived_at"] = datetime.now().isoformat()
del data[item_id]
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to archive item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
save_data(f"{data_type}_archives", archives)
def _handle_duplicate_operation(self, operation: BatchOperation) -> None:
"""Handle duplicate operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
# Load data
data = load_data(data_type, {})
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data:
skipped += 1
continue
try:
# Create duplicate
if not operation.dry_run:
new_id = str(uuid.uuid4())
new_item = data[item_id].copy()
new_item["id"] = new_id
new_item["created_at"] = datetime.now().isoformat()
new_item["updated_at"] = datetime.now().isoformat()
new_item["title"] = f"Copy of {new_item.get('title', '')}" # Add 'Copy of' prefix
data[new_id] = new_item
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to duplicate item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
def _handle_merge_operation(self, operation: BatchOperation) -> None:
"""Handle merge operation
Args:
operation: Operation to handle
"""
config = operation.operation_config
data_type = config.get("data_type", "")
target_id = config.get("target_id", "") # ID of the item to merge into
merge_fields = config.get("merge_fields", []) # Fields to merge
# Load data
data = load_data(data_type, {})
# Check if target exists
if not target_id or target_id not in data:
operation.add_error(f"Target item not found: {target_id}")
return
# Process items
processed = 0
succeeded = 0
failed = 0
skipped = 0
for item in operation.target_items:
processed += 1
item_id = item.get("id")
if not item_id or item_id not in data or item_id == target_id:
skipped += 1
continue
try:
# Merge fields
if not operation.dry_run:
for field in merge_fields:
if field in data[item_id]:
# Handle different field types
if field == "content" or field == "description":
# Append text content
target_content = data[target_id].get(field, "")
item_content = data[item_id].get(field, "")
data[target_id][field] = f"{target_content}\n\n{item_content}"
elif field == "tags":
# Merge tags
target_tags = set(data[target_id].get("tags", []))
item_tags = set(data[item_id].get("tags", []))
data[target_id]["tags"] = list(target_tags.union(item_tags))
elif field == "attachments":
# Merge attachments
target_attachments = data[target_id].get("attachments", [])
item_attachments = data[item_id].get("attachments", [])
data[target_id]["attachments"] = target_attachments + item_attachments
elif field == "comments":
# Merge comments
target_comments = data[target_id].get("comments", [])
item_comments = data[item_id].get("comments", [])
data[target_id]["comments"] = target_comments + item_comments
# Delete the merged item
del data[item_id]
succeeded += 1
except Exception as e:
failed += 1
operation.add_error(f"Failed to merge item {item_id}: {str(e)}")
# Update progress
operation.update_progress(processed, succeeded, failed, skipped)
# Save data
if not operation.dry_run:
save_data(data_type, data)
# Create a global instance of the batch processor
batch_processor = BatchProcessor()