from flow_modules.aiflows.HumanStandardInputFlowModule import HumanStandardInputFlow from typing import Dict, Any from aiflows.messages import UpdateMessage_Generic from aiflows.utils import logging log = logging.get_logger(f"aiflows.{__name__}") class IntermediateAns_Jarvis(HumanStandardInputFlow): """This class inherits from the HumanStandardInputFlow class. It is used to give an intermediate answer to the user. The user is then able to provide feedback on the intermediate result. Depending on the user's feedback, the controller will decide to do different things (e.g. continue, re-plan, etc.) *Input Interface*: - `answer`: The intermediate answer to the question asked by the user. *Output Interface*: - `result`: User's response to the intermediate answer. - `summary`: A summary of the action. *Configuration parameters*: - `query_message_prompt_template`: The template of the message that is shown to the user. - `request_multi_line_input_flag`: A flag that indicates whether the user can give a multi-line input. - `end_of_input_string`: The string that indicates the end of the input. """ def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: query_message = self._get_message(self.query_message_prompt_template, input_data) state_update_message = UpdateMessage_Generic( created_by=self.flow_config['name'], updated_flow=self.flow_config["name"], data={"query_message": query_message}, ) self._log_message(state_update_message) log.info(query_message) human_input = self._read_input() answer = input_data["answer"] response = {} response["result"] = human_input response["summary"] = f"Intermediate Answer provided: {answer}; feedback of user: {human_input}" return response