RunCodeFlowModule / RunCodeAskUserFlow.py
Tachi67's picture
Update RunCodeAskUserFlow.py
ee57ac6
raw
history blame
1.29 kB
from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow
from typing import Dict, Any
from aiflows.messages import UpdateMessage_Generic
from aiflows.utils import logging
log = logging.get_logger(f"flows.{__name__}")
class RunCodeAskUserFlow(HumanStandardInputFlow):
"""Refer to: https://huggingface.co/Tachi67/ExtendLibraryFlowModule/blob/main/ExtLibAskUserFlow.py"""
def run(self,
input_data: Dict[str, Any]) -> Dict[str, Any]:
query_message = self._get_message(self.query_message_prompt_template, input_data)
state_update_message = UpdateMessage_Generic(
created_by=self.flow_config['name'],
updated_flow=self.flow_config["name"],
data={"query_message": query_message},
)
self._log_message(state_update_message)
log.info(query_message)
human_input = self._read_input()
response = {}
result = f"""
The following code was ran:
{input_data['code_ran']}
The execution result is:
{input_data['interpreter_output']}
The user's feedback is:
{human_input}
"""
response["result"] = result
response["summary"] = f"Coder/run_code: \n" + result
return response