JarvisFlowModule / UpdatePlanAtomicFlow.py
Tachi67's picture
add more comments & docs, typo fix
343bc6e verified
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
class UpdatePlanAtomicFlow(AtomicFlow):
"""This class is used to update the plan file with the updated plan, called by the controller,
when it realizes one step of the plan is done, and provide the updated plan, it is exactly the same
as the old plan, except the step that is done is marked as done.
*Input Interface*:
- `updated_plan`: the updated plan, exactly the same as the old plan, except the step that is done is marked as done.
*Output Interface*:
- `result`: the result of the operation
*Configuration Parameters*:
- `input_interface`: the input interface of the atomic flow
- `output_interface`: the output interface of the atomic flow
"""
def _check_input(self, input_data: Dict[str, Any]):
"""
Check if the input data is valid.
:param input_data: the input data to be checked
:type input_data: Dict[str, Any]
:raises AssertionError: if the input data is invalid
:raises AssertionError: if the input data does not contain the plan file location
"""
assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml"
assert "plan" in input_data["memory_files"], "plan not in memory_files"
def _call(self, input_data: Dict[str, Any]):
"""
Write the updated plan to the plan file.
:param input_data: the input data
:type input_data: Dict[str, Any]
:return: the result of the operation
:rtype: Dict[str, Any]
"""
try:
plan_file_location = input_data["memory_files"]["plan"]
plan_to_write = input_data["updated_plan"]
with open(plan_file_location, 'w') as file:
file.write(plan_to_write + "\n")
return {
"result": "updated plan saved to the plan file and has overriden the previous plan",
"summary": f"Jarvis/UpdatePlanFlow: updated plan saved to {plan_file_location}"
}
except Exception as e:
return {
"result": f"Error occurred: {str(e)}",
"summary": f"Jarvis/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}"
}
def run(
self,
input_data: Dict[str, Any]
):
"""
Run the atomic flow.
:param input_data: the input data
:type input_data: Dict[str, Any]
:return: the result of the operation
:rtype: Dict[str, Any]
"""
self._check_input(input_data)
return self._call(input_data)