nbaldwin commited on
Commit
a8e421a
1 Parent(s): 41db545

changed naming of messaging methods

Browse files
ControllerAtomicFlow.py CHANGED
@@ -136,9 +136,10 @@ class ControllerAtomicFlow(ChatAtomicFlow):
136
 
137
  response = json.loads(api_output)
138
 
139
- reply = self._package_output_message(
140
  input_message=input_message,
141
  response=response
142
  )
143
 
144
- self.reply_to_message(reply = reply, to = input_message)
 
 
136
 
137
  response = json.loads(api_output)
138
 
139
+ reply = self.package_output_message(
140
  input_message=input_message,
141
  response=response
142
  )
143
 
144
+
145
+ self.send_message(reply, is_reply = True)
ControllerExecutorFlow.py CHANGED
@@ -89,11 +89,11 @@ class ControllerExecutorFlow(CompositeFlow):
89
  def generate_reply(self):
90
  """ This method generates the reply of the flow. It's called when the flow is finished. """
91
 
92
- reply = self._package_output_message(
93
  input_message = self.flow_state["input_message"],
94
  response = self.reply_interface(self.flow_state)
95
  )
96
- self.reply_to_message(reply,to=self.flow_state["input_message"])
97
 
98
  def get_next_flow_to_call(self):
99
 
@@ -123,23 +123,29 @@ class ControllerExecutorFlow(CompositeFlow):
123
  else:
124
  input_interface = self.input_interface_controller
125
 
126
- message = self._package_input_message(
127
  data = input_interface(self.flow_state),
128
  dst_flow = "Controller"
129
  )
130
 
131
- self.subflows["Controller"].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
 
 
 
132
 
133
  def call_executor(self):
134
 
135
  #call executor
136
  executor_branch_to_call = self.flow_state["command"]
137
- message = self._package_input_message(
138
  data = self.flow_state["command_args"],
139
  dst_flow = executor_branch_to_call
140
  )
141
 
142
- self.subflows[executor_branch_to_call].send_message_async(message, pipe_to= self.flow_config["flow_ref"])
 
 
 
143
 
144
 
145
  def register_data_to_state(self, input_message):
 
89
  def generate_reply(self):
90
  """ This method generates the reply of the flow. It's called when the flow is finished. """
91
 
92
+ reply = self.package_output_message(
93
  input_message = self.flow_state["input_message"],
94
  response = self.reply_interface(self.flow_state)
95
  )
96
+ self.send_message(reply,is_reply = True)
97
 
98
  def get_next_flow_to_call(self):
99
 
 
123
  else:
124
  input_interface = self.input_interface_controller
125
 
126
+ message = self.package_input_message(
127
  data = input_interface(self.flow_state),
128
  dst_flow = "Controller"
129
  )
130
 
131
+ self.subflows["Controller"].get_reply(
132
+ message,
133
+ self.get_instance_id(),
134
+ )
135
 
136
  def call_executor(self):
137
 
138
  #call executor
139
  executor_branch_to_call = self.flow_state["command"]
140
+ message = self.package_input_message(
141
  data = self.flow_state["command_args"],
142
  dst_flow = executor_branch_to_call
143
  )
144
 
145
+ self.subflows[executor_branch_to_call].get_reply(
146
+ message,
147
+ self.get_instance_id(),
148
+ )
149
 
150
 
151
  def register_data_to_state(self, input_message):
WikiSearchAtomicFlow.py CHANGED
@@ -69,9 +69,9 @@ class WikiSearchAtomicFlow(AtomicFlow):
69
  # Log the update to the flow messages list
70
  observation = search_response["wiki_content"] if search_response["wiki_content"] else search_response["relevant_pages"]
71
 
72
- reply = self._package_output_message(
73
  input_message = input_message,
74
  response = {"wiki_content": observation},
75
  )
76
 
77
- self.reply_to_message(reply=reply, to=input_message)
 
69
  # Log the update to the flow messages list
70
  observation = search_response["wiki_content"] if search_response["wiki_content"] else search_response["relevant_pages"]
71
 
72
+ reply = self.package_output_message(
73
  input_message = input_message,
74
  response = {"wiki_content": observation},
75
  )
76
 
77
+ self.send_message(reply, is_reply = True)
run.py CHANGED
@@ -93,10 +93,10 @@ if __name__ == "__main__":
93
  )
94
 
95
  #option2: use the proxy_flow
96
- #input_message = proxy_flow._package_input_message(data = data)
97
 
98
  #7. ~~~ Run inference ~~~
99
- future = proxy_flow.send_message_blocking(input_message)
100
 
101
  #uncomment this line if you would like to get the full message back
102
  #reply_message = future.get_message()
 
93
  )
94
 
95
  #option2: use the proxy_flow
96
+ #input_message = proxy_flow.package_input_message(data = data)
97
 
98
  #7. ~~~ Run inference ~~~
99
+ future = proxy_flow.get_reply_future(input_message)
100
 
101
  #uncomment this line if you would like to get the full message back
102
  #reply_message = future.get_message()