nbaldwin commited on
Commit
1ed169b
·
1 Parent(s): 593a9f3

compatible to new version

Browse files
ChatWithDemonstrationsFlow.py CHANGED
@@ -53,26 +53,30 @@ class ChatWithDemonstrationsFlow(CompositeFlow):
53
  super().set_up_flow_state()
54
  self.flow_state["last_flow_called"] = None
55
 
56
- def run(self,input_data):
57
 
58
- #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
59
- future = self.ask_subflow("demonstration_flow",input_data)
60
 
61
- answer = self.ask_subflow("chat_flow",future.get_data())
62
- return self.output_interface(answer.get_data())
63
 
64
  # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
65
- # if self.flow_state["last_flow_called"] is None:
66
- # self.ask_pipe_subflow("demonstration_flow",input_data)
67
- # self.flow_state["last_flow_called"] = "demonstration_flow"
68
- # return {"answer": "Just Called the Demonstration Flow"}
69
 
70
- # elif self.flow_state["last_flow_called"] == "demonstration_flow":
71
- # self.ask_pipe_subflow("chat_flow",input_data)
72
- # self.flow_state["last_flow_called"] = "chat_flow"
73
- # return {"answer": "Just Called the Demonstration Flow"}
74
-
75
- # self.flow_state["last_flow_called"] = None
76
- return self.output_interface(input_data)
 
 
 
 
77
 
78
 
 
53
  super().set_up_flow_state()
54
  self.flow_state["last_flow_called"] = None
55
 
56
+ def run(self,input_message):
57
 
58
+ # #~~~~~~~~~~~Solution 1 - Blocking ~~~~~~~
59
+ # future = self.subflows["demonstration_flow"].send_message_blocking(input_message)
60
 
61
+ # answer = self.subflows["chat_flow"].send_message_blocking(future.get_message())
62
+ # self.reply_to_message(reply =self.output_interface(answer.get_message()), to=input_message)
63
 
64
  # #~~~~~~~~~~~Solution 2 - Non-Blocking ~~~~~~~
65
+ if self.flow_state["last_flow_called"] is None:
66
+ self.flow_state["input_message"] = input_message
67
+ self.subflows["demonstration_flow"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"])
68
+ self.flow_state["last_flow_called"] = "demonstration_flow"
69
 
70
+ elif self.flow_state["last_flow_called"] == "demonstration_flow":
71
+ self.subflows["chat_flow"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"])
72
+ self.flow_state["last_flow_called"] = "chat_flow"
73
+
74
+ else:
75
+ self.flow_state["last_flow_called"] = None
76
+ self.reply_to_message(
77
+ reply =self.output_interface(input_message),
78
+ to=self.flow_state["input_message"]
79
+ )
80
+
81
 
82
 
DemonstrationsAtomicFlow.py CHANGED
@@ -182,7 +182,7 @@ class DemonstrationsAtomicFlow(AtomicFlow):
182
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
183
 
184
  def run(self,
185
- input_data: Dict[str, Any]) -> Dict[str, Any]:
186
  """ This method runs the flow. It returns the input data of the flow with the demonstrations added to it.
187
 
188
  :param input_data: The input data of the flow.
@@ -190,4 +190,9 @@ class DemonstrationsAtomicFlow(AtomicFlow):
190
  :return: The input data of the flow with the demonstrations added to it.
191
  :rtype: Dict[str, Any]
192
  """
193
- return {**input_data,**{"demonstrations": self._get_io_pairs(input_data=input_data)}}
 
 
 
 
 
 
182
  log.info("Loaded the demonstrations for %d datapoints from %s", len(self.data), self.params["data_dir"])
183
 
184
  def run(self,
185
+ input_message):
186
  """ This method runs the flow. It returns the input data of the flow with the demonstrations added to it.
187
 
188
  :param input_data: The input data of the flow.
 
190
  :return: The input data of the flow with the demonstrations added to it.
191
  :rtype: Dict[str, Any]
192
  """
193
+ input_data = input_message.data
194
+ reply = self._package_output_message(
195
+ input_message=input_message,
196
+ response = {**{"demonstrations": self._get_io_pairs(input_data=input_data)},**input_data}
197
+ )
198
+ self.reply_to_message(reply = reply, to=input_message)
__init__.py CHANGED
@@ -2,7 +2,7 @@
2
 
3
 
4
  dependencies = [
5
- {"url": "aiflows/ChatFlowModule", "revision": "main"},
6
  ]
7
  from aiflows import flow_verse
8
  flow_verse.sync_dependencies(dependencies)
 
2
 
3
 
4
  dependencies = [
5
+ {"url": "aiflows/ChatFlowModule", "revision": "coflows"},
6
  ]
7
  from aiflows import flow_verse
8
  flow_verse.sync_dependencies(dependencies)
run.py CHANGED
@@ -9,82 +9,115 @@ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
 
12
  from aiflows.utils import serve_utils
13
  from aiflows.workers import run_dispatch_worker_thread
14
  from aiflows.messages import FlowMessage
15
  from aiflows.interfaces import KeyInterface
 
 
 
16
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
17
  # clear_cache() # Uncomment this line to clear the cache
18
 
19
- logging.set_verbosity_debug() # Uncomment this line to see verbose logs
20
-
21
- from aiflows import flow_verse
22
 
23
 
24
  dependencies = [
25
  {"url": "aiflows/ChatWithDemonstrationsFlowModule", "revision": os.getcwd()}
26
  ]
27
 
 
28
  flow_verse.sync_dependencies(dependencies)
29
  if __name__ == "__main__":
30
- # ~~~ Set the API information ~~~
31
- # OpenAI backend
32
-
33
- api_information = [ApiInfo(backend_used="openai", api_key=os.getenv("OPENAI_API_KEY"))]
 
34
 
35
 
36
- FLOW_MODULES_PATH = "./"
 
 
 
37
 
38
- jwt = os.getenv("COLINK_JWT")
39
- addr = os.getenv("LOCAL_COLINK_ADDRESS")
 
 
 
 
 
 
 
40
 
41
- cl = serve_utils.start_colink_component(
42
- "Reverse Number Demo",
43
- {"jwt": jwt, "addr": addr}
44
- )
45
 
 
 
 
 
46
  # # Azure backend
47
  # api_information = ApiInfo(backend_used = "azure",
48
  # api_base = os.getenv("AZURE_API_BASE"),
49
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
50
  # api_version = os.getenv("AZURE_API_VERSION") )
 
 
51
 
52
- root_dir = "."
53
- cfg_path = os.path.join(root_dir, "demo.yaml")
54
- cfg = read_yaml_file(cfg_path)
55
-
56
  serve_utils.recursive_serve_flow(
57
  cl = cl,
58
- flow_type="simpleDemonstrationQA",
59
  default_config=cfg,
60
  default_state=None,
61
- default_dispatch_point="coflows_dispatch",
62
  )
63
 
64
- quick_load_api_keys(cfg, api_information, key="api_infos")
65
-
 
 
66
  proxy_flow = serve_utils.recursive_mount(
67
  cl=cl,
68
  client_id="local",
69
- flow_type="simpleDemonstrationQA",
70
- config_overrides=cfg,
71
  initial_state=None,
72
  dispatch_point_override=None,
73
  )
74
 
75
-
76
- # ~~~ Get the data ~~~
77
- data = {"id": 0, "question": "What's the capital of France?"} # This can be a list of samples
78
  # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
79
- # ~~~ Run inference ~~~
80
-
81
  input_message = FlowMessage(
82
- data= data,
83
- src_flow="Coflows team",
84
- dst_flow=proxy_flow,
85
- is_input_msg=True
86
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
- future = proxy_flow.ask(input_message)
89
 
90
- print(future.get_data())
 
 
9
 
10
  from aiflows import logging
11
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
+
13
  from aiflows.utils import serve_utils
14
  from aiflows.workers import run_dispatch_worker_thread
15
  from aiflows.messages import FlowMessage
16
  from aiflows.interfaces import KeyInterface
17
+ from aiflows.utils.colink_utils import start_colink_server
18
+ from aiflows.workers import run_dispatch_worker_thread
19
+
20
  CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
21
  # clear_cache() # Uncomment this line to clear the cache
22
 
23
+ logging.set_verbosity_debug()
 
 
24
 
25
 
26
  dependencies = [
27
  {"url": "aiflows/ChatWithDemonstrationsFlowModule", "revision": os.getcwd()}
28
  ]
29
 
30
+ from aiflows import flow_verse
31
  flow_verse.sync_dependencies(dependencies)
32
  if __name__ == "__main__":
33
+
34
+ #1. ~~~~~ Set up a colink server ~~~~
35
+ FLOW_MODULES_PATH = "./"
36
+
37
+ cl = start_colink_server()
38
 
39
 
40
+ #2. ~~~~~Load flow config~~~~~~
41
+ root_dir = "."
42
+ cfg_path = os.path.join(root_dir, "demo.yaml")
43
+ cfg = read_yaml_file(cfg_path)
44
 
45
+ #2.1 ~~~ Set the API information ~~~
46
+ # OpenAI backend
47
+ api_information = [ApiInfo(backend_used="openai",
48
+ api_key = os.getenv("OPENAI_API_KEY"))]
49
+ # # Azure backend
50
+ # api_information = ApiInfo(backend_used = "azure",
51
+ # api_base = os.getenv("AZURE_API_BASE"),
52
+ # api_key = os.getenv("AZURE_OPENAI_KEY"),
53
+ # api_version = os.getenv("AZURE_API_VERSION") )
54
 
 
 
 
 
55
 
56
+ #2.1 ~~~ Set the API information ~~~
57
+ # OpenAI backend
58
+ api_information = [ApiInfo(backend_used="openai",
59
+ api_key = os.getenv("OPENAI_API_KEY"))]
60
  # # Azure backend
61
  # api_information = ApiInfo(backend_used = "azure",
62
  # api_base = os.getenv("AZURE_API_BASE"),
63
  # api_key = os.getenv("AZURE_OPENAI_KEY"),
64
  # api_version = os.getenv("AZURE_API_VERSION") )
65
+
66
+ quick_load_api_keys(cfg, api_information, key="api_infos")
67
 
68
+
69
+ #3. ~~~~ Serve The Flow ~~~~
 
 
70
  serve_utils.recursive_serve_flow(
71
  cl = cl,
72
+ flow_type="ChatWithDemonstrationFlowModule",
73
  default_config=cfg,
74
  default_state=None,
75
+ default_dispatch_point="coflows_dispatch"
76
  )
77
 
78
+ #4. ~~~~~Start A Worker Thread~~~~~
79
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
80
+
81
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
82
  proxy_flow = serve_utils.recursive_mount(
83
  cl=cl,
84
  client_id="local",
85
+ flow_type="ChatWithDemonstrationFlowModule",
86
+ config_overrides=None,
87
  initial_state=None,
88
  dispatch_point_override=None,
89
  )
90
 
91
+ #6. ~~~ Get the data ~~~
92
+ data = {"id": 0, "question": "What is the capital of France?"} # This can be a list of samples
 
93
  # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
94
+
95
+ #option1: use the FlowMessage class
96
  input_message = FlowMessage(
97
+ data=data,
 
 
 
98
  )
99
+
100
+ #option2: use the proxy_flow
101
+ #input_message = proxy_flow._package_input_message(data = data)
102
+
103
+ #7. ~~~ Run inference ~~~
104
+ future = proxy_flow.send_message_blocking(input_message)
105
+
106
+ #uncomment this line if you would like to get the full message back
107
+ #reply_message = future.get_message()
108
+ reply_data = future.get_data()
109
+
110
+ # ~~~ Print the output ~~~
111
+ print("~~~~~~Reply~~~~~~")
112
+ print(reply_data)
113
+
114
+
115
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
116
+ # output_interface = KeyInterface(
117
+ # keys_to_rename={"api_output": "answer"},
118
+ # )
119
+ # print("Output: ", output_interface(reply_data))
120
 
 
121
 
122
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
123
+ # serve_utils.delete_served_flow(cl, "ChatWithDemonstrationFlowModule")