|
import json |
|
import uuid |
|
from typing import Dict, List, Any, Optional, Callable, Union |
|
from datetime import datetime |
|
import threading |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, AutomationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class BatchOperation: |
|
"""Batch operation for mass processing""" |
|
|
|
def __init__(self, name: str, operation_type: str, |
|
target_items: List[Dict[str, Any]], operation_config: Dict[str, Any], |
|
description: Optional[str] = None): |
|
"""Initialize a batch operation |
|
|
|
Args: |
|
name: Operation name |
|
operation_type: Type of operation (update, delete, tag, categorize, etc.) |
|
target_items: List of items to process |
|
operation_config: Operation configuration |
|
description: Operation description (optional) |
|
""" |
|
self.id = str(uuid.uuid4()) |
|
self.name = name |
|
self.description = description or "" |
|
self.operation_type = operation_type |
|
self.target_items = target_items |
|
self.operation_config = operation_config |
|
self.created_at = datetime.now().isoformat() |
|
self.updated_at = self.created_at |
|
self.started_at = None |
|
self.completed_at = None |
|
self.status = "pending" |
|
self.progress = 0 |
|
self.results = { |
|
"total": len(target_items), |
|
"processed": 0, |
|
"succeeded": 0, |
|
"failed": 0, |
|
"skipped": 0, |
|
"errors": [] |
|
} |
|
self.dry_run = False |
|
|
|
@handle_exceptions |
|
def start(self) -> None: |
|
"""Start the batch operation""" |
|
if self.status != "pending": |
|
raise AutomationError(f"Cannot start operation in {self.status} state") |
|
|
|
self.status = "running" |
|
self.started_at = datetime.now().isoformat() |
|
self.updated_at = self.started_at |
|
|
|
@handle_exceptions |
|
def complete(self) -> None: |
|
"""Mark the batch operation as completed""" |
|
if self.status != "running": |
|
raise AutomationError(f"Cannot complete operation in {self.status} state") |
|
|
|
self.status = "completed" |
|
self.completed_at = datetime.now().isoformat() |
|
self.updated_at = self.completed_at |
|
self.progress = 100 |
|
|
|
@handle_exceptions |
|
def fail(self, error: str) -> None: |
|
"""Mark the batch operation as failed |
|
|
|
Args: |
|
error: Error message |
|
""" |
|
self.status = "failed" |
|
self.updated_at = datetime.now().isoformat() |
|
self.results["errors"].append(error) |
|
|
|
@handle_exceptions |
|
def cancel(self) -> None: |
|
"""Cancel the batch operation""" |
|
if self.status not in ["pending", "running"]: |
|
raise AutomationError(f"Cannot cancel operation in {self.status} state") |
|
|
|
self.status = "cancelled" |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def update_progress(self, processed: int, succeeded: int, failed: int, skipped: int) -> None: |
|
"""Update operation progress |
|
|
|
Args: |
|
processed: Number of items processed |
|
succeeded: Number of items processed successfully |
|
failed: Number of items that failed |
|
skipped: Number of items skipped |
|
""" |
|
self.results["processed"] = processed |
|
self.results["succeeded"] = succeeded |
|
self.results["failed"] = failed |
|
self.results["skipped"] = skipped |
|
|
|
total = self.results["total"] |
|
if total > 0: |
|
self.progress = min(100, int((processed / total) * 100)) |
|
|
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def add_error(self, error: str) -> None: |
|
"""Add an error message |
|
|
|
Args: |
|
error: Error message |
|
""" |
|
self.results["errors"].append(error) |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def set_dry_run(self, dry_run: bool) -> None: |
|
"""Set dry run mode |
|
|
|
Args: |
|
dry_run: Whether to run in dry run mode |
|
""" |
|
self.dry_run = dry_run |
|
self.updated_at = datetime.now().isoformat() |
|
|
|
@handle_exceptions |
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert operation to dictionary |
|
|
|
Returns: |
|
Operation as dictionary |
|
""" |
|
return { |
|
"id": self.id, |
|
"name": self.name, |
|
"description": self.description, |
|
"operation_type": self.operation_type, |
|
"target_items": self.target_items, |
|
"operation_config": self.operation_config, |
|
"created_at": self.created_at, |
|
"updated_at": self.updated_at, |
|
"started_at": self.started_at, |
|
"completed_at": self.completed_at, |
|
"status": self.status, |
|
"progress": self.progress, |
|
"results": self.results, |
|
"dry_run": self.dry_run |
|
} |
|
|
|
@classmethod |
|
def from_dict(cls, data: Dict[str, Any]) -> 'BatchOperation': |
|
"""Create operation from dictionary |
|
|
|
Args: |
|
data: Operation data |
|
|
|
Returns: |
|
BatchOperation instance |
|
""" |
|
operation = cls( |
|
data["name"], |
|
data["operation_type"], |
|
data["target_items"], |
|
data["operation_config"], |
|
data.get("description", "") |
|
) |
|
operation.id = data["id"] |
|
operation.created_at = data["created_at"] |
|
operation.updated_at = data["updated_at"] |
|
operation.started_at = data.get("started_at") |
|
operation.completed_at = data.get("completed_at") |
|
operation.status = data["status"] |
|
operation.progress = data["progress"] |
|
operation.results = data["results"] |
|
operation.dry_run = data.get("dry_run", False) |
|
return operation |
|
|
|
|
|
class BatchProcessor: |
|
"""Processor for batch operations""" |
|
|
|
def __init__(self): |
|
"""Initialize batch processor""" |
|
self.operations = {} |
|
self.running_operations = {} |
|
self.operation_handlers = { |
|
"update": self._handle_update_operation, |
|
"delete": self._handle_delete_operation, |
|
"tag": self._handle_tag_operation, |
|
"categorize": self._handle_categorize_operation, |
|
"status_change": self._handle_status_change_operation, |
|
"priority_change": self._handle_priority_change_operation, |
|
"due_date_change": self._handle_due_date_change_operation, |
|
"assign": self._handle_assign_operation, |
|
"export": self._handle_export_operation, |
|
"import": self._handle_import_operation, |
|
"custom": self._handle_custom_operation |
|
} |
|
self.load_operations() |
|
|
|
@handle_exceptions |
|
def load_operations(self) -> None: |
|
"""Load operations from storage""" |
|
try: |
|
operations_data = load_data("batch_operations", default=[]) |
|
for operation_data in operations_data: |
|
operation = BatchOperation.from_dict(operation_data) |
|
self.operations[operation.id] = operation |
|
logger.info(f"Loaded {len(self.operations)} batch operations") |
|
except Exception as e: |
|
logger.error(f"Failed to load batch operations: {str(e)}") |
|
|
|
@handle_exceptions |
|
def save_operations(self) -> None: |
|
"""Save operations to storage""" |
|
try: |
|
operations_data = [operation.to_dict() for operation in self.operations.values()] |
|
save_data("batch_operations", operations_data) |
|
logger.info(f"Saved {len(self.operations)} batch operations") |
|
except Exception as e: |
|
logger.error(f"Failed to save batch operations: {str(e)}") |
|
|
|
@handle_exceptions |
|
def create_operation(self, name: str, operation_type: str, |
|
target_items: List[Dict[str, Any]], operation_config: Dict[str, Any], |
|
description: Optional[str] = None, dry_run: bool = False) -> BatchOperation: |
|
"""Create a new batch operation |
|
|
|
Args: |
|
name: Operation name |
|
operation_type: Type of operation |
|
target_items: List of items to process |
|
operation_config: Operation configuration |
|
description: Operation description (optional) |
|
dry_run: Whether to run in dry run mode |
|
|
|
Returns: |
|
Created operation |
|
""" |
|
operation = BatchOperation(name, operation_type, target_items, operation_config, description) |
|
operation.set_dry_run(dry_run) |
|
self.operations[operation.id] = operation |
|
self.save_operations() |
|
return operation |
|
|
|
@handle_exceptions |
|
def get_operation(self, operation_id: str) -> Optional[BatchOperation]: |
|
"""Get operation by ID |
|
|
|
Args: |
|
operation_id: Operation ID |
|
|
|
Returns: |
|
Operation if found, None otherwise |
|
""" |
|
return self.operations.get(operation_id) |
|
|
|
@handle_exceptions |
|
def update_operation(self, operation: BatchOperation) -> None: |
|
"""Update operation |
|
|
|
Args: |
|
operation: Operation to update |
|
""" |
|
if operation.id in self.operations: |
|
operation.updated_at = datetime.now().isoformat() |
|
self.operations[operation.id] = operation |
|
self.save_operations() |
|
else: |
|
raise AutomationError(f"Operation not found: {operation.id}") |
|
|
|
@handle_exceptions |
|
def delete_operation(self, operation_id: str) -> None: |
|
"""Delete operation |
|
|
|
Args: |
|
operation_id: Operation ID |
|
""" |
|
if operation_id in self.operations: |
|
del self.operations[operation_id] |
|
self.save_operations() |
|
else: |
|
raise AutomationError(f"Operation not found: {operation_id}") |
|
|
|
@handle_exceptions |
|
def get_all_operations(self) -> List[BatchOperation]: |
|
"""Get all operations |
|
|
|
Returns: |
|
List of all operations |
|
""" |
|
return list(self.operations.values()) |
|
|
|
@handle_exceptions |
|
def execute_operation(self, operation_id: str) -> None: |
|
"""Execute a batch operation |
|
|
|
Args: |
|
operation_id: Operation ID |
|
""" |
|
operation = self.get_operation(operation_id) |
|
if not operation: |
|
raise AutomationError(f"Operation not found: {operation_id}") |
|
|
|
if operation.status != "pending": |
|
raise AutomationError(f"Cannot execute operation in {operation.status} state") |
|
|
|
|
|
operation.start() |
|
self.update_operation(operation) |
|
|
|
|
|
thread = threading.Thread(target=self._execute_operation_thread, args=(operation_id,)) |
|
thread.daemon = True |
|
thread.start() |
|
|
|
|
|
self.running_operations[operation_id] = thread |
|
|
|
def _execute_operation_thread(self, operation_id: str) -> None: |
|
"""Execute a batch operation in a separate thread |
|
|
|
Args: |
|
operation_id: Operation ID |
|
""" |
|
operation = self.get_operation(operation_id) |
|
if not operation: |
|
return |
|
|
|
try: |
|
|
|
handler = self.operation_handlers.get(operation.operation_type) |
|
if not handler: |
|
raise AutomationError(f"Unknown operation type: {operation.operation_type}") |
|
|
|
|
|
handler(operation) |
|
|
|
|
|
operation.complete() |
|
self.update_operation(operation) |
|
|
|
except Exception as e: |
|
|
|
operation.fail(str(e)) |
|
self.update_operation(operation) |
|
logger.error(f"Operation execution error: {str(e)}") |
|
|
|
|
|
if operation_id in self.running_operations: |
|
del self.running_operations[operation_id] |
|
|
|
@handle_exceptions |
|
def cancel_operation(self, operation_id: str) -> None: |
|
"""Cancel a running operation |
|
|
|
Args: |
|
operation_id: Operation ID |
|
""" |
|
operation = self.get_operation(operation_id) |
|
if not operation: |
|
raise AutomationError(f"Operation not found: {operation_id}") |
|
|
|
|
|
operation.cancel() |
|
self.update_operation(operation) |
|
|
|
|
|
|
|
def _handle_update_operation(self, operation: BatchOperation) -> None: |
|
"""Handle update operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
updates = config.get("updates", {}) |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
for key, value in updates.items(): |
|
data[item_id][key] = value |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to update item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_delete_operation(self, operation: BatchOperation) -> None: |
|
"""Handle delete operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
del data[item_id] |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to delete item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_tag_operation(self, operation: BatchOperation) -> None: |
|
"""Handle tag operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
action = config.get("action", "add") |
|
tags = config.get("tags", []) |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
current_tags = data[item_id].get("tags", []) |
|
|
|
|
|
if action == "add": |
|
|
|
new_tags = current_tags.copy() |
|
for tag in tags: |
|
if tag not in new_tags: |
|
new_tags.append(tag) |
|
|
|
elif action == "remove": |
|
|
|
new_tags = [tag for tag in current_tags if tag not in tags] |
|
|
|
elif action == "replace": |
|
|
|
new_tags = tags.copy() |
|
|
|
else: |
|
raise AutomationError(f"Unknown tag action: {action}") |
|
|
|
|
|
if not operation.dry_run: |
|
data[item_id]["tags"] = new_tags |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to update tags for item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_categorize_operation(self, operation: BatchOperation) -> None: |
|
"""Handle categorize operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
category = config.get("category", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
data[item_id]["category"] = category |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to categorize item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_status_change_operation(self, operation: BatchOperation) -> None: |
|
"""Handle status change operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
status = config.get("status", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
data[item_id]["status"] = status |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to change status for item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_priority_change_operation(self, operation: BatchOperation) -> None: |
|
"""Handle priority change operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
priority = config.get("priority", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
data[item_id]["priority"] = priority |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to change priority for item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_due_date_change_operation(self, operation: BatchOperation) -> None: |
|
"""Handle due date change operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
due_date = config.get("due_date", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
data[item_id]["due_date"] = due_date |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to change due date for item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_assign_operation(self, operation: BatchOperation) -> None: |
|
"""Handle assign operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
assignee = config.get("assignee", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
data[item_id]["assignee"] = assignee |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to assign item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_export_operation(self, operation: BatchOperation) -> None: |
|
"""Handle export operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
export_format = config.get("format", "json") |
|
file_path = config.get("file_path", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
|
|
export_data = {} |
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
export_data[item_id] = data[item_id] |
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to export item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run and file_path: |
|
try: |
|
if export_format == "json": |
|
with open(file_path, "w") as f: |
|
json.dump(export_data, f, indent=2) |
|
|
|
elif export_format == "csv": |
|
|
|
pass |
|
|
|
elif export_format == "markdown": |
|
|
|
pass |
|
|
|
else: |
|
operation.add_error(f"Unknown export format: {export_format}") |
|
except Exception as e: |
|
operation.add_error(f"Failed to export data: {str(e)}") |
|
|
|
def _handle_import_operation(self, operation: BatchOperation) -> None: |
|
"""Handle import operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
import_format = config.get("format", "json") |
|
file_path = config.get("file_path", "") |
|
merge_strategy = config.get("merge_strategy", "overwrite") |
|
|
|
|
|
current_data = load_data(data_type, {}) |
|
|
|
|
|
import_data = {} |
|
try: |
|
if import_format == "json" and file_path: |
|
with open(file_path, "r") as f: |
|
import_data = json.load(f) |
|
|
|
elif import_format == "csv" and file_path: |
|
|
|
pass |
|
|
|
elif import_format == "markdown" and file_path: |
|
|
|
pass |
|
|
|
else: |
|
operation.add_error(f"Unknown import format: {import_format}") |
|
return |
|
except Exception as e: |
|
operation.add_error(f"Failed to load import data: {str(e)}") |
|
return |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item_id, item_data in import_data.items(): |
|
processed += 1 |
|
|
|
try: |
|
if item_id in current_data: |
|
if merge_strategy == "overwrite": |
|
|
|
if not operation.dry_run: |
|
current_data[item_id] = item_data |
|
succeeded += 1 |
|
|
|
elif merge_strategy == "merge": |
|
|
|
if not operation.dry_run: |
|
for key, value in item_data.items(): |
|
current_data[item_id][key] = value |
|
succeeded += 1 |
|
|
|
elif merge_strategy == "skip": |
|
|
|
skipped += 1 |
|
|
|
else: |
|
operation.add_error(f"Unknown merge strategy: {merge_strategy}") |
|
failed += 1 |
|
else: |
|
|
|
if not operation.dry_run: |
|
current_data[item_id] = item_data |
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to import item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, current_data) |
|
|
|
def _handle_custom_operation(self, operation: BatchOperation) -> None: |
|
"""Handle custom operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
custom_type = config.get("custom_type", "") |
|
|
|
|
|
if custom_type == "archive": |
|
self._handle_archive_operation(operation) |
|
elif custom_type == "duplicate": |
|
self._handle_duplicate_operation(operation) |
|
elif custom_type == "merge": |
|
self._handle_merge_operation(operation) |
|
else: |
|
operation.add_error(f"Unknown custom operation type: {custom_type}") |
|
|
|
def _handle_archive_operation(self, operation: BatchOperation) -> None: |
|
"""Handle archive operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
archives = load_data(f"{data_type}_archives", {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
archives[item_id] = data[item_id] |
|
archives[item_id]["archived_at"] = datetime.now().isoformat() |
|
del data[item_id] |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to archive item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
save_data(f"{data_type}_archives", archives) |
|
|
|
def _handle_duplicate_operation(self, operation: BatchOperation) -> None: |
|
"""Handle duplicate operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
new_id = str(uuid.uuid4()) |
|
new_item = data[item_id].copy() |
|
new_item["id"] = new_id |
|
new_item["created_at"] = datetime.now().isoformat() |
|
new_item["updated_at"] = datetime.now().isoformat() |
|
new_item["title"] = f"Copy of {new_item.get('title', '')}" |
|
data[new_id] = new_item |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to duplicate item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
def _handle_merge_operation(self, operation: BatchOperation) -> None: |
|
"""Handle merge operation |
|
|
|
Args: |
|
operation: Operation to handle |
|
""" |
|
config = operation.operation_config |
|
data_type = config.get("data_type", "") |
|
target_id = config.get("target_id", "") |
|
merge_fields = config.get("merge_fields", []) |
|
|
|
|
|
data = load_data(data_type, {}) |
|
|
|
|
|
if not target_id or target_id not in data: |
|
operation.add_error(f"Target item not found: {target_id}") |
|
return |
|
|
|
|
|
processed = 0 |
|
succeeded = 0 |
|
failed = 0 |
|
skipped = 0 |
|
|
|
for item in operation.target_items: |
|
processed += 1 |
|
item_id = item.get("id") |
|
|
|
if not item_id or item_id not in data or item_id == target_id: |
|
skipped += 1 |
|
continue |
|
|
|
try: |
|
|
|
if not operation.dry_run: |
|
for field in merge_fields: |
|
if field in data[item_id]: |
|
|
|
if field == "content" or field == "description": |
|
|
|
target_content = data[target_id].get(field, "") |
|
item_content = data[item_id].get(field, "") |
|
data[target_id][field] = f"{target_content}\n\n{item_content}" |
|
|
|
elif field == "tags": |
|
|
|
target_tags = set(data[target_id].get("tags", [])) |
|
item_tags = set(data[item_id].get("tags", [])) |
|
data[target_id]["tags"] = list(target_tags.union(item_tags)) |
|
|
|
elif field == "attachments": |
|
|
|
target_attachments = data[target_id].get("attachments", []) |
|
item_attachments = data[item_id].get("attachments", []) |
|
data[target_id]["attachments"] = target_attachments + item_attachments |
|
|
|
elif field == "comments": |
|
|
|
target_comments = data[target_id].get("comments", []) |
|
item_comments = data[item_id].get("comments", []) |
|
data[target_id]["comments"] = target_comments + item_comments |
|
|
|
|
|
del data[item_id] |
|
|
|
succeeded += 1 |
|
except Exception as e: |
|
failed += 1 |
|
operation.add_error(f"Failed to merge item {item_id}: {str(e)}") |
|
|
|
|
|
operation.update_progress(processed, succeeded, failed, skipped) |
|
|
|
|
|
if not operation.dry_run: |
|
save_data(data_type, data) |
|
|
|
|
|
|
|
batch_processor = BatchProcessor() |