|
from aiflows.base_flows import CompositeFlow |
|
from aiflows.utils import logging |
|
from aiflows.messages import FlowMessage |
|
from aiflows.interfaces import KeyInterface |
|
from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction |
|
log = logging.get_logger(f"aiflows.{__name__}") |
|
|
|
|
|
class ChatHumanFlowModule(CompositeFlow): |
|
""" This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows: |
|
|
|
- A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question. |
|
- A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question. |
|
|
|
To end the interaction, the user must type "\<END\>" |
|
|
|
An illustration of the flow is as follows: |
|
|
|
|------> User Flow -----------> | |
|
^ | |
|
| | |
|
| v |
|
|<------ Assistant Flow <-------| |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. Default: "ChatHumanFlowModule" |
|
- `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
|
Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input." |
|
- `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds. |
|
- `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction" |
|
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: |
|
- `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule. |
|
- `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule. |
|
- `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular". |
|
By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml). |
|
|
|
*Input Interface*: |
|
|
|
- None. By default, the input interface doesn't expect any input. |
|
|
|
*Output Interface*: |
|
|
|
- `end_of_interaction` (bool): Whether the interaction is finished or not. |
|
|
|
:param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor. |
|
:type \**kwargs: Dict[str, Any] |
|
""" |
|
|
|
def __init__(self, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
|
|
self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"]) |
|
|
|
|
|
self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"]) |
|
|
|
self.input_interface_assistant = KeyInterface( |
|
keys_to_rename = {"human_input": "query"}, |
|
additional_transformations = [self.regex_extractor, self.end_of_interaction] |
|
) |
|
|
|
self.input_interface_user = KeyInterface() |
|
|
|
def set_up_flow_state(self): |
|
""" This method sets up the flow state. It is called when the flow is executed.""" |
|
super().set_up_flow_state() |
|
self.flow_state["last_state"] = None |
|
self.flow_state["current_round"] = 0 |
|
self.flow_state["user_inputs"] = [] |
|
self.flow_state["assistant_outputs"] = [] |
|
self.flow_state["input_message"] = None |
|
self.flow_state["end_of_interaction"] = False |
|
|
|
@classmethod |
|
def type(cls): |
|
""" This method returns the type of the flow.""" |
|
return "OpenAIChatHumanFlowModule" |
|
|
|
def max_rounds_reached(self): |
|
""" This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False.""" |
|
return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"] |
|
|
|
def generate_reply(self): |
|
""" This method generates the reply message. It is called when the interaction is finished. |
|
:param input_message: The input message to the flow. |
|
:type input_message: FlowMessage |
|
""" |
|
reply = self.package_output_message( |
|
input_message = self.flow_state["input_message"], |
|
response = { |
|
"user_inputs": self.flow_state["user_inputs"], |
|
"assistant_outputs": self.flow_state["assistant_outputs"], |
|
"end_of_interaction": self.flow_state["end_of_interaction"] |
|
}, |
|
) |
|
|
|
self.send_message( |
|
reply |
|
) |
|
|
|
|
|
|
|
def call_to_user(self,input_message): |
|
""" This method calls the User Flow. (Human) |
|
|
|
:param input_message: The input message to the flow. |
|
:type input_message: FlowMessage |
|
""" |
|
msg = self.input_interface_user(input_message) |
|
self.flow_state["assistant_outputs"].append(msg.data["api_output"]) |
|
message = self.package_input_message(data=msg.data) |
|
|
|
if self.max_rounds_reached(): |
|
self.generate_reply() |
|
else: |
|
self.subflows["User"].get_reply( |
|
message, |
|
) |
|
self.flow_state["last_state"] = "User" |
|
|
|
self.flow_state["current_round"] += 1 |
|
|
|
|
|
|
|
def call_to_assistant(self,input_message): |
|
""" This method calls the Assistant Flow. |
|
|
|
:param input_message: The input message to the flow. |
|
:type input_message: FlowMessage |
|
""" |
|
message = self.input_interface_assistant(input_message) |
|
message = self.package_input_message(data=message.data) |
|
|
|
if self.flow_state["last_state"] is None: |
|
self.flow_state["input_message"] = input_message |
|
|
|
else: |
|
self.flow_state["user_inputs"].append(input_message.data["query"]) |
|
|
|
if message.data["end_of_interaction"]: |
|
self.flow_state["end_of_interaction"] = True |
|
self.generate_reply() |
|
|
|
else: |
|
self.subflows["Assistant"].get_reply( |
|
message, |
|
) |
|
self.flow_state["last_state"] = "Assistant" |
|
|
|
def run(self,input_message: FlowMessage): |
|
""" This method runs the flow. It is the main method of the flow and it is called when the flow is executed. |
|
|
|
:param input_message: The input message to the flow. |
|
:type input_message: FlowMessage |
|
""" |
|
last_state = self.flow_state["last_state"] |
|
|
|
if last_state is None or last_state == "User": |
|
self.call_to_assistant(input_message=input_message) |
|
|
|
|
|
elif last_state == "Assistant": |
|
self.call_to_user(input_message=input_message) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|