|
from typing import Dict, Any |
|
from aiflows.base_flows.atomic import AtomicFlow |
|
class UpdatePlanAtomicFlow(AtomicFlow): |
|
"""This class is used to update the plan file with the updated plan, called by the controller, |
|
when it realizes one step of the plan is done, and provide the updated plan, it is exactly the same |
|
as the old plan, except the step that is done is marked as done. |
|
|
|
*Input Interface*: |
|
- `updated_plan`: the updated plan, exactly the same as the old plan, except the step that is done is marked as done. |
|
|
|
*Output Interface*: |
|
- `result`: the result of the operation |
|
|
|
*Configuration Parameters*: |
|
- `input_interface`: the input interface of the atomic flow |
|
- `output_interface`: the output interface of the atomic flow |
|
|
|
""" |
|
def _check_input(self, input_data: Dict[str, Any]): |
|
""" |
|
Check if the input data is valid. |
|
:param input_data: the input data to be checked |
|
:type input_data: Dict[str, Any] |
|
:raises AssertionError: if the input data is invalid |
|
:raises AssertionError: if the input data does not contain the plan file location |
|
""" |
|
assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml" |
|
assert "plan" in input_data["memory_files"], "plan not in memory_files" |
|
|
|
def _call(self, input_data: Dict[str, Any]): |
|
""" |
|
Write the updated plan to the plan file. |
|
:param input_data: the input data |
|
:type input_data: Dict[str, Any] |
|
:return: the result of the operation |
|
:rtype: Dict[str, Any] |
|
""" |
|
try: |
|
plan_file_location = input_data["memory_files"]["plan"] |
|
plan_to_write = input_data["updated_plan"] |
|
with open(plan_file_location, 'w') as file: |
|
file.write(plan_to_write + "\n") |
|
return { |
|
"result": "updated plan saved to the plan file and has overriden the previous plan", |
|
"summary": f"Jarvis/UpdatePlanFlow: updated plan saved to {plan_file_location}" |
|
} |
|
except Exception as e: |
|
return { |
|
"result": f"Error occurred: {str(e)}", |
|
"summary": f"Jarvis/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}" |
|
} |
|
|
|
def run( |
|
self, |
|
input_data: Dict[str, Any] |
|
): |
|
""" |
|
Run the atomic flow. |
|
:param input_data: the input data |
|
:type input_data: Dict[str, Any] |
|
:return: the result of the operation |
|
:rtype: Dict[str, Any] |
|
""" |
|
self._check_input(input_data) |
|
return self._call(input_data) |