from aiflows.base_flows import CompositeFlow from aiflows.utils import logging from aiflows.messages import FlowMessage from aiflows.interfaces import KeyInterface from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction log = logging.get_logger(f"aiflows.{__name__}") class ChatHumanFlowModule(CompositeFlow): """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows: - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question. - A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question. To end the interaction, the user must type "\" An illustration of the flow is as follows: |------> User Flow -----------> | ^ | | | | v |<------ Assistant Flow <-------| *Configuration Parameters*: - `name` (str): The name of the flow. Default: "ChatHumanFlowModule" - `description` (str): A description of the flow. This description is used to generate the help message of the flow. Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input." - `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds. - `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction" - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: - `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule. - `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule. - `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular". By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml). *Input Interface*: - None. By default, the input interface doesn't expect any input. *Output Interface*: - `end_of_interaction` (bool): Whether the interaction is finished or not. :param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor. :type \**kwargs: Dict[str, Any] """ def __init__(self, **kwargs): super().__init__(**kwargs) self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"]) self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"]) self.input_interface_assistant = KeyInterface( keys_to_rename = {"human_input": "query"}, additional_transformations = [self.regex_extractor, self.end_of_interaction] ) self.input_interface_user = KeyInterface() def set_up_flow_state(self): """ This method sets up the flow state. It is called when the flow is executed.""" super().set_up_flow_state() self.flow_state["last_state"] = None self.flow_state["current_round"] = 0 self.flow_state["user_inputs"] = [] self.flow_state["assistant_outputs"] = [] self.flow_state["input_message"] = None self.flow_state["end_of_interaction"] = False @classmethod def type(cls): """ This method returns the type of the flow.""" return "OpenAIChatHumanFlowModule" def max_rounds_reached(self): """ This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False.""" return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"] def generate_reply(self): """ This method generates the reply message. It is called when the interaction is finished. :param input_message: The input message to the flow. :type input_message: FlowMessage """ reply = self.package_output_message( input_message = self.flow_state["input_message"], response = { "user_inputs": self.flow_state["user_inputs"], "assistant_outputs": self.flow_state["assistant_outputs"], "end_of_interaction": self.flow_state["end_of_interaction"] }, ) self.send_message( reply ) def call_to_user(self,input_message): """ This method calls the User Flow. (Human) :param input_message: The input message to the flow. :type input_message: FlowMessage """ msg = self.input_interface_user(input_message)###I ADDED THIS self.flow_state["assistant_outputs"].append(msg.data["api_output"])###I message = self.package_input_message(data=msg.data) if self.max_rounds_reached(): self.generate_reply() else: self.subflows["User"].get_reply( message, ) self.flow_state["last_state"] = "User" self.flow_state["current_round"] += 1 def call_to_assistant(self,input_message): """ This method calls the Assistant Flow. :param input_message: The input message to the flow. :type input_message: FlowMessage """ message = self.input_interface_assistant(input_message) message = self.package_input_message(data=message.data) if self.flow_state["last_state"] is None: self.flow_state["input_message"] = input_message else: self.flow_state["user_inputs"].append(input_message.data["query"]) if message.data["end_of_interaction"]: self.flow_state["end_of_interaction"] = True self.generate_reply() else: self.subflows["Assistant"].get_reply( message, ) self.flow_state["last_state"] = "Assistant" def run(self,input_message: FlowMessage): """ This method runs the flow. It is the main method of the flow and it is called when the flow is executed. :param input_message: The input message to the flow. :type input_message: FlowMessage """ last_state = self.flow_state["last_state"] if last_state is None or last_state == "User": self.call_to_assistant(input_message=input_message) elif last_state == "Assistant": self.call_to_user(input_message=input_message)