ChatInteractiveFlowModule / ChatHumanFlowModule.py
nbaldwin's picture
Getting ready for merge
6f93da3
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction
log = logging.get_logger(f"aiflows.{__name__}")
class ChatHumanFlowModule(CompositeFlow):
""" This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
- A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
- A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question.
To end the interaction, the user must type "\<END\>"
An illustration of the flow is as follows:
|------> User Flow -----------> |
^ |
| |
| v
|<------ Assistant Flow <-------|
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "ChatHumanFlowModule"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input."
- `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds.
- `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction"
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default:
- `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule.
- `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule.
- `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular".
By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml).
*Input Interface*:
- None. By default, the input interface doesn't expect any input.
*Output Interface*:
- `end_of_interaction` (bool): Whether the interaction is finished or not.
:param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor.
:type \**kwargs: Dict[str, Any]
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"])
self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"])
self.input_interface_assistant = KeyInterface(
keys_to_rename = {"human_input": "query"},
additional_transformations = [self.regex_extractor, self.end_of_interaction]
)
self.input_interface_user = KeyInterface()
def set_up_flow_state(self):
""" This method sets up the flow state. It is called when the flow is executed."""
super().set_up_flow_state()
self.flow_state["last_state"] = None
self.flow_state["current_round"] = 0
self.flow_state["user_inputs"] = []
self.flow_state["assistant_outputs"] = []
self.flow_state["input_message"] = None
self.flow_state["end_of_interaction"] = False
@classmethod
def type(cls):
""" This method returns the type of the flow."""
return "OpenAIChatHumanFlowModule"
def max_rounds_reached(self):
""" This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False."""
return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]
def generate_reply(self):
""" This method generates the reply message. It is called when the interaction is finished.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
reply = self.package_output_message(
input_message = self.flow_state["input_message"],
response = {
"user_inputs": self.flow_state["user_inputs"],
"assistant_outputs": self.flow_state["assistant_outputs"],
"end_of_interaction": self.flow_state["end_of_interaction"]
},
)
self.send_message(
reply
)
def call_to_user(self,input_message):
""" This method calls the User Flow. (Human)
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
msg = self.input_interface_user(input_message)###I ADDED THIS
self.flow_state["assistant_outputs"].append(msg.data["api_output"])###I
message = self.package_input_message(data=msg.data)
if self.max_rounds_reached():
self.generate_reply()
else:
self.subflows["User"].get_reply(
message,
)
self.flow_state["last_state"] = "User"
self.flow_state["current_round"] += 1
def call_to_assistant(self,input_message):
""" This method calls the Assistant Flow.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
message = self.input_interface_assistant(input_message)
message = self.package_input_message(data=message.data)
if self.flow_state["last_state"] is None:
self.flow_state["input_message"] = input_message
else:
self.flow_state["user_inputs"].append(input_message.data["query"])
if message.data["end_of_interaction"]:
self.flow_state["end_of_interaction"] = True
self.generate_reply()
else:
self.subflows["Assistant"].get_reply(
message,
)
self.flow_state["last_state"] = "Assistant"
def run(self,input_message: FlowMessage):
""" This method runs the flow. It is the main method of the flow and it is called when the flow is executed.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
last_state = self.flow_state["last_state"]
if last_state is None or last_state == "User":
self.call_to_assistant(input_message=input_message)
elif last_state == "Assistant":
self.call_to_user(input_message=input_message)