File size: 7,258 Bytes
6211d74
035821c
6211d74
 
 
035821c
f2030ec
 
6211d74
f2030ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6211d74
 
 
 
 
 
 
 
 
 
 
 
6f93da3
 
6211d74
 
 
0b2483c
6211d74
 
 
 
 
f2030ec
 
 
 
 
6211d74
 
0b2483c
6211d74
 
 
0b2483c
 
 
 
bd514cd
6211d74
 
 
 
 
 
 
 
bd514cd
1c54ca4
6211d74
 
 
 
 
0b2483c
6211d74
0b2483c
 
 
6f93da3
 
 
6211d74
 
 
 
bd514cd
b6b3916
bd514cd
0b2483c
6211d74
 
 
 
 
 
0b2483c
 
 
 
 
6211d74
b6b3916
6211d74
0b2483c
6211d74
 
 
 
 
 
 
 
 
 
bd514cd
 
 
0b2483c
6211d74
 
 
 
 
 
 
0b2483c
6211d74
0b2483c
6211d74
 
 
0b2483c
6211d74
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from aiflows.base_flows import CompositeFlow
from aiflows.utils import logging
from aiflows.messages import FlowMessage
from aiflows.interfaces import KeyInterface
from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction
log = logging.get_logger(f"aiflows.{__name__}")


class ChatHumanFlowModule(CompositeFlow):
    """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows:
    
    - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question.
    - A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question.
    
    To end the interaction, the user must type "\<END\>"
    
    An illustration of the flow is as follows:
    
            |------> User Flow -----------> |
            ^                               |
            |                               |
            |                               v
            |<------ Assistant Flow <-------|
    
    *Configuration Parameters*:
    
    - `name` (str): The name of the flow. Default: "ChatHumanFlowModule"
    - `description` (str): A description of the flow. This description is used to generate the help message of the flow.
    Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input."
    - `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds.
    - `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction"
    - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default:
        - `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule.
        - `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule.
    - `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular".
    By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml).

    *Input Interface*:
    
    - None. By default, the input interface doesn't expect any input.
        
    *Output Interface*:

    - `end_of_interaction` (bool): Whether the interaction is finished or not.
    
    :param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor.
    :type \**kwargs: Dict[str, Any]
    """
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        
        
        self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"])
        
        
        self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"])
        
        self.input_interface_assistant = KeyInterface(
            keys_to_rename = {"human_input": "query"},
            additional_transformations = [self.regex_extractor, self.end_of_interaction]
        )
        
        self.input_interface_user = KeyInterface()
        
    def set_up_flow_state(self):
        """ This method sets up the flow state. It is called when the flow is executed."""
        super().set_up_flow_state()
        self.flow_state["last_state"] = None
        self.flow_state["current_round"] = 0
        self.flow_state["user_inputs"] = []
        self.flow_state["assistant_outputs"] = []
        self.flow_state["input_message"] = None
        self.flow_state["end_of_interaction"] = False

    @classmethod
    def type(cls):
        """ This method returns the type of the flow."""
        return "OpenAIChatHumanFlowModule"
    
    def max_rounds_reached(self):
        """ This method checks if the maximum number of rounds has been reached. If the maximum number of rounds has been reached, it returns True. Otherwise, it returns False."""
        return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"]
    
    def generate_reply(self):
        """ This method generates the reply message. It is called when the interaction is finished.
        :param input_message: The input message to the flow.
        :type input_message: FlowMessage
        """
        reply = self.package_output_message(
            input_message = self.flow_state["input_message"],
            response = {
                "user_inputs": self.flow_state["user_inputs"],
                "assistant_outputs":  self.flow_state["assistant_outputs"],
                "end_of_interaction": self.flow_state["end_of_interaction"]
            },
        )
        
        self.send_message(
            reply
        )
        
            
    
    def call_to_user(self,input_message):
        """ This method calls the User Flow. (Human)
        
        :param input_message: The input message to the flow.
        :type input_message: FlowMessage
        """
        msg = self.input_interface_user(input_message)###I ADDED THIS
        self.flow_state["assistant_outputs"].append(msg.data["api_output"])###I
        message = self.package_input_message(data=msg.data)

        if self.max_rounds_reached():
            self.generate_reply()
        else:
            self.subflows["User"].get_reply(
                message,
            )
            self.flow_state["last_state"] = "User"
        
        self.flow_state["current_round"] += 1
        
        
            
    def call_to_assistant(self,input_message):
        """ This method calls the Assistant Flow.

        :param input_message: The input message to the flow.
        :type input_message: FlowMessage
        """
        message = self.input_interface_assistant(input_message)
        message = self.package_input_message(data=message.data)
        
        if  self.flow_state["last_state"] is None:
            self.flow_state["input_message"] = input_message
        
        else:
            self.flow_state["user_inputs"].append(input_message.data["query"])
        
        if message.data["end_of_interaction"]:
            self.flow_state["end_of_interaction"] = True
            self.generate_reply()
            
        else:
            self.subflows["Assistant"].get_reply(
                message,
            )
            self.flow_state["last_state"] = "Assistant"    
    
    def run(self,input_message: FlowMessage):
        """ This method runs the flow. It is the main method of the flow and it is called when the flow is executed.
        
        :param input_message: The input message to the flow.
        :type input_message: FlowMessage
        """
        last_state = self.flow_state["last_state"]
        
        if last_state is None or last_state == "User":
            self.call_to_assistant(input_message=input_message)
            
            
        elif last_state == "Assistant":
            self.call_to_user(input_message=input_message)