JarvisFlowModule / Planner_JarvisFlow.py
Tachi67's picture
add more comments & docs, typo fix
343bc6e verified
raw
history blame
4.29 kB
from typing import Dict, Any
import os
from flow_modules.aiflows.PlanWriterFlowModule import PlanWriterFlow
from aiflows.base_flows import CircularFlow
class Planner_JarvisFlow(PlanWriterFlow):
"""This flow inherits from PlanWriterFlow (https://huggingface.co/aiflows/PlanWriterFlowModule), and is used to generate a plan for Jarvis.
*Input Interface*:
- `goal` (str): the goal of the planner, the goal comes from the user's query when calling Jarvis.
- `memory_files` (dict): a dictionary of memory files, the keys are the names of the memory files, the values are the locations of the memory files.
*Output Interfaces*:
- `plan` (str): the generated plan, the plan string will be written to the plan file and returned to the flow state of the Jarvis flow.
- `summary` (str): the summary of the planner.
- `status` (str): the status of the planner, can be "finished" or "unfinished".
*Configuration Parameters*:
- Also refer to PlanWriterFlow (https://huggingface.co/aiflows/PlanWriterFlowModule/blob/main/PlanWriterFlow.py) for more configuration parameters.
- `input_interface`: the input interface of the flow.
- `output_interface`: the output interface of the flow.
- `subflows_config`: the configuration of the subflows of the flow.
- `early_exit_key`: the key of the early exit signal in the output payload.
- `topology`: the topology of the subflows.
"""
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
"""This function is used to detect whether the planner should finish or continue.
:param output_payload: the output payload of the flow.
:type output_payload: Dict[str, Any]
:param src_flow: the flow that generates the output payload.
:type src_flow: Flow
:return: the output payload of the flow.
:rtype: Dict[str, Any]
"""
command = output_payload["command"]
if command == "finish":
# ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state
keys_to_fetch_from_state = ["temp_plan_file_location", "plan", "memory_files"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
plan_content = fetched_state["plan"]
plan_file_location = fetched_state["memory_files"]["plan"]
# ~~~ delete the temp plan file ~~~
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ write plan content to plan file ~~~
with open(plan_file_location, 'w') as file:
file.write(plan_content)
# ~~~ return the plan content ~~~
return {
"EARLY_EXIT": True,
"plan": plan_content,
"summary": "Jarvis/PlanWriter: " + output_payload["command_args"]["summary"],
"status": "finished"
}
elif command == "manual_finish":
# ~~~ delete the temp plan file ~~~
keys_to_fetch_from_state = ["temp_plan_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ return the manual quit status ~~~
return {
"EARLY_EXIT": True,
"plan": "no plan was generated",
"summary": "Jarvis/PlanWriter: PlanWriter was terminated explicitly by the user, process is unfinished",
"status": "unfinished"
}
elif command == "write_plan":
keys_to_fetch_from_state = ["memory_files"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
output_payload["command_args"]["plan_file_location"] = fetched_state["memory_files"]["plan"]
return output_payload
else:
return output_payload