import os import re import time import json from typing import Dict, List, Any, Optional, Union class TaskExecutor: """Task execution engine for the autonomous AI agent This module provides capabilities for executing various tasks including: 1. Task planning and execution 2. Progress tracking 3. Result formatting 4. Error handling """ def __init__(self, reasoning_engine): """Initialize the task executor Args: reasoning_engine: The reasoning engine to use for task planning """ self.reasoning_engine = reasoning_engine self.current_task = None self.task_status = "idle" self.task_history = [] self.max_history_length = 10 def execute_task(self, task_description: str) -> Dict[str, Any]: """Execute a task based on the description Args: task_description: Description of the task to execute Returns: Dictionary containing task results and status """ self.current_task = task_description self.task_status = "in_progress" start_time = time.time() try: # First, analyze the task to understand its requirements and constraints analysis_prompt = f"""I need to analyze this task to understand its requirements and constraints: Task: {task_description} Task Analysis: 1. What is the main objective of this task? 2. What are the key requirements? 3. What constraints or limitations should I be aware of? 4. What resources or information do I need to complete this task? Analysis:""" task_analysis = self.reasoning_engine.generate_text(analysis_prompt, max_length=768) # Decompose the task into subtasks with the analysis in mind decomposition_prompt = f"""Based on my analysis of the task: {task_analysis} I need to break down this task into smaller, manageable subtasks: Task: {task_description} Subtasks: 1. """ decomposition = self.reasoning_engine.generate_text(decomposition_prompt, max_length=1024) # Parse the decomposition into a list of subtasks subtasks = [] for line in decomposition.split('\n'): line = line.strip() if line and (line[0].isdigit() or line.startswith('- ')): # Remove numbering or bullet points cleaned_line = re.sub(r'^\d+\.\s*|^-\s*', '', line).strip() if cleaned_line: subtasks.append(cleaned_line) # If parsing failed, use the reasoning engine's decompose_task method as fallback if not subtasks: subtasks = self.reasoning_engine.decompose_task(task_description) # Generate a detailed plan for executing the task planning_prompt = f"""I need to create a detailed plan to execute this task: {task_description} Task Analysis: {task_analysis} The task has been broken down into these subtasks: {json.dumps(subtasks, indent=2)} Detailed step-by-step plan (including how to handle potential issues): 1. """ plan = self.reasoning_engine.generate_text(planning_prompt, max_length=1024) # Track progress of each subtask with more detailed execution subtask_results = [] for i, subtask in enumerate(subtasks): # Update status self.task_status = f"in_progress ({i+1}/{len(subtasks)})" # Execute the subtask with more context subtask_prompt = f"""I am executing this subtask as part of the larger task: Main Task: {task_description} Current Subtask ({i+1}/{len(subtasks)}): {subtask} Previous Results: {json.dumps([r['result'] for r in subtask_results], indent=2) if subtask_results else 'None yet'} I will now execute this subtask carefully and report the detailed results:""" result = self.reasoning_engine.generate_text(subtask_prompt, max_length=768) # Evaluate the quality of the result evaluation_prompt = f"""I need to evaluate the quality of my execution of this subtask: Subtask: {subtask} Execution Result: {result} Evaluation (rate from 1-10 and explain):""" evaluation = self.reasoning_engine.generate_text(evaluation_prompt, max_length=256) subtask_results.append({ "subtask": subtask, "result": result, "evaluation": evaluation }) # Compile the final results with synthesis compilation_prompt = f"""I have executed all subtasks for the main task: {task_description} Here are the results of each subtask: {json.dumps(subtask_results, indent=2)} I need to synthesize these results into a coherent final result that addresses the original task completely. Final synthesized result:""" final_result = self.reasoning_engine.generate_text(compilation_prompt, max_length=1024) # Self-reflection on the task execution reflection_prompt = f"""I need to reflect on my execution of this task: Task: {task_description} My approach: {plan} Final result: {final_result} Reflection on what went well and what could be improved:""" reflection = self.reasoning_engine.generate_text(reflection_prompt, max_length=512) self.task_status = "completed" execution_time = time.time() - start_time # Add to task history task_record = { "task": task_description, "plan": plan, "subtasks": subtask_results, "result": final_result, "reflection": reflection, "status": self.task_status, "execution_time": execution_time, "timestamp": time.time() } self.task_history.append(task_record) # Trim history if it gets too long if len(self.task_history) > self.max_history_length: self.task_history = self.task_history[-self.max_history_length:] return task_record except Exception as e: self.task_status = "failed" error_message = str(e) # Add failed task to history task_record = { "task": task_description, "status": self.task_status, "error": error_message, "timestamp": time.time() } self.task_history.append(task_record) return task_record def get_task_status(self) -> Dict[str, Any]: """Get the current status of task execution Returns: Dictionary containing task status information """ return { "current_task": self.current_task, "status": self.task_status, "history_length": len(self.task_history) } def get_task_history(self) -> List[Dict[str, Any]]: """Get the history of executed tasks Returns: List of task records """ return self.task_history def cancel_task(self) -> Dict[str, Any]: """Cancel the currently executing task Returns: Dictionary containing cancellation status """ if self.task_status == "in_progress": self.task_status = "cancelled" # Update the last task record in history if self.task_history: self.task_history[-1]["status"] = "cancelled" return { "task": self.current_task, "status": self.task_status, "message": "Task cancelled successfully" } else: return { "task": self.current_task, "status": self.task_status, "message": "No task in progress to cancel" }