File size: 7,258 Bytes
6211d74 035821c 6211d74 035821c f2030ec 6211d74 f2030ec 6211d74 6f93da3 6211d74 0b2483c 6211d74 f2030ec 6211d74 0b2483c 6211d74 0b2483c bd514cd 6211d74 bd514cd 1c54ca4 6211d74 0b2483c 6211d74 0b2483c 6f93da3 6211d74 bd514cd b6b3916 bd514cd 0b2483c 6211d74 0b2483c 6211d74 b6b3916 6211d74 0b2483c 6211d74 bd514cd 0b2483c 6211d74 0b2483c 6211d74 0b2483c 6211d74 0b2483c 6211d74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction
log = logging.get_logger(f"aiflows.{__name__}")
class ChatHumanFlowModule(CompositeFlow):
""" This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
- A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
- A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question.
To end the interaction, the user must type "\<END\>"
An illustration of the flow is as follows:
|------> User Flow -----------> |
^ |
| |
| v
|<------ Assistant Flow <-------|
*Configuration Parameters*:
- `name` (str): The name of the flow. Default: "ChatHumanFlowModule"
- `description` (str): A description of the flow. This description is used to generate the help message of the flow.
Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input."
- `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds.
- `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction"
- `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default:
- `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule.
- `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule.
- `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular".
By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml).
*Input Interface*:
- None. By default, the input interface doesn't expect any input.
*Output Interface*:
- `end_of_interaction` (bool): Whether the interaction is finished or not.
:param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor.
:type \**kwargs: Dict[str, Any]
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"])
self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"])
self.input_interface_assistant = KeyInterface(
keys_to_rename = {"human_input": "query"},
additional_transformations = [self.regex_extractor, self.end_of_interaction]
)
self.input_interface_user = KeyInterface()
def set_up_flow_state(self):
""" This method sets up the flow state. It is called when the flow is executed."""
super().set_up_flow_state()
self.flow_state["last_state"] = None
self.flow_state["current_round"] = 0
self.flow_state["user_inputs"] = []
self.flow_state["assistant_outputs"] = []
self.flow_state["input_message"] = None
self.flow_state["end_of_interaction"] = False
@classmethod
def type(cls):
""" This method returns the type of the flow."""
return "OpenAIChatHumanFlowModule"
def max_rounds_reached(self):
""" This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False."""
return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]
def generate_reply(self):
""" This method generates the reply message. It is called when the interaction is finished.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
reply = self.package_output_message(
input_message = self.flow_state["input_message"],
response = {
"user_inputs": self.flow_state["user_inputs"],
"assistant_outputs": self.flow_state["assistant_outputs"],
"end_of_interaction": self.flow_state["end_of_interaction"]
},
)
self.send_message(
reply
)
def call_to_user(self,input_message):
""" This method calls the User Flow. (Human)
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
msg = self.input_interface_user(input_message)###I ADDED THIS
self.flow_state["assistant_outputs"].append(msg.data["api_output"])###I
message = self.package_input_message(data=msg.data)
if self.max_rounds_reached():
self.generate_reply()
else:
self.subflows["User"].get_reply(
message,
)
self.flow_state["last_state"] = "User"
self.flow_state["current_round"] += 1
def call_to_assistant(self,input_message):
""" This method calls the Assistant Flow.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
message = self.input_interface_assistant(input_message)
message = self.package_input_message(data=message.data)
if self.flow_state["last_state"] is None:
self.flow_state["input_message"] = input_message
else:
self.flow_state["user_inputs"].append(input_message.data["query"])
if message.data["end_of_interaction"]:
self.flow_state["end_of_interaction"] = True
self.generate_reply()
else:
self.subflows["Assistant"].get_reply(
message,
)
self.flow_state["last_state"] = "Assistant"
def run(self,input_message: FlowMessage):
""" This method runs the flow. It is the main method of the flow and it is called when the flow is executed.
:param input_message: The input message to the flow.
:type input_message: FlowMessage
"""
last_state = self.flow_state["last_state"]
if last_state is None or last_state == "User":
self.call_to_assistant(input_message=input_message)
elif last_state == "Assistant":
self.call_to_user(input_message=input_message)
|