File size: 3,857 Bytes
4462cc9 c6f2abf 4462cc9 12f0756 ee30ef2 12f0756 4462cc9 8d743fd 4462cc9 8d743fd 4462cc9 8d743fd 4462cc9 8d743fd 4462cc9 c6a7812 8d743fd c6a7812 8d743fd c6a7812 4462cc9 0e7961d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
from typing import Dict, Any
import os
from flow_modules.Tachi67.ContentWriterFlowModule import ContentWriterFlow
from aiflows.base_flows import CircularFlow
class PlanWriterFlow(ContentWriterFlow):
"""This flow inherits from ContentWriterFlow.
In the subflow of the executor, we specify the InteractivePlanGneFlow (https://huggingface.co/Tachi67/InteractivePlanGenFlowModule)
*Input Interface*:
- `goal`
*Output Interface*:
- `plan`
- `result`
- `summary`
- `status`
"""
def _on_reach_max_round(self):
self._state_update_dict({
"plan": "The maximum amount of rounds was reached before the model generated the plan.",
"status": "unfinished"
})
@CircularFlow.output_msg_payload_processor
def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow) -> Dict[str, Any]:
command = output_payload["command"]
if command == "finish":
# ~~~ fetch temp file location, plan content, memory file (of upper level flow e.g. ExtLib) from flow state
keys_to_fetch_from_state = ["temp_plan_file_location", "plan", "memory_files"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
plan_content = fetched_state["plan"]
plan_file_location = fetched_state["memory_files"]["plan"]
# ~~~ delete the temp plan file ~~~
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ write plan content to plan file ~~~
with open(plan_file_location, 'w') as file:
file.write(plan_content)
# ~~~ return the plan content ~~~
return {
"EARLY_EXIT": True,
"plan": plan_content,
"summary": "ExtendLibrary/PlanWriter: " + output_payload["command_args"]["summary"],
"status": "finished"
}
elif command == "manual_finish":
# ~~~ delete the temp plan file ~~~
keys_to_fetch_from_state = ["temp_plan_file_location"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
temp_plan_file_location = fetched_state["temp_plan_file_location"]
if os.path.exists(temp_plan_file_location):
os.remove(temp_plan_file_location)
# ~~~ return the manual quit status ~~~
return {
"EARLY_EXIT": True,
"plan": "no plan was generated",
"summary": "ExtendLibrary/PlanWriter: PlanWriter was terminated explicitly by the user, process is unfinished",
"status": "unfinished"
}
elif command == "write_plan":
keys_to_fetch_from_state = ["memory_files"]
fetched_state = self._fetch_state_attributes_by_keys(keys=keys_to_fetch_from_state)
plan_file_location = fetched_state["memory_files"]["plan"]
output_payload["command_args"]["plan_file_location"] = plan_file_location
return output_payload
else:
return output_payload
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
# ~~~ sets the input_data in the flow_state dict ~~~
self._state_update_dict(update_data=input_data)
max_rounds = self.flow_config.get("max_rounds", 1)
if max_rounds is None:
log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")
self._sequential_run(max_rounds=max_rounds)
output = self._get_output_from_state()
self.reset(full_reset=True, recursive=True, src_flow=self)
return output |