nbaldwin commited on
Commit
db419e8
·
1 Parent(s): bdd8d48

coflows compatible

Browse files
Files changed (3) hide show
  1. HumanStandardInputFlow.py +8 -4
  2. demo.yaml +14 -16
  3. run.py +71 -31
HumanStandardInputFlow.py CHANGED
@@ -8,7 +8,7 @@ from aiflows.base_flows import AtomicFlow
8
  from aiflows.messages import UpdateMessage_Generic
9
 
10
  from aiflows.utils import logging
11
-
12
  # logging.set_verbosity_debug() # ToDo: Has no effect on the logger for __name__. Level is warn, and info is not printed
13
  log = logging.get_logger(f"aiflows.{__name__}") # ToDo: Is there a better fix?
14
 
@@ -142,7 +142,7 @@ class HumanStandardInputFlow(AtomicFlow):
142
  return human_input
143
 
144
  def run(self,
145
- input_data: Dict[str, Any]) -> Dict[str, Any]:
146
  """ Runs the HumanStandardInputFlow. It's used to read input from the user/human's standard input.
147
 
148
  :param input_data: The input data dictionary
@@ -150,8 +150,10 @@ class HumanStandardInputFlow(AtomicFlow):
150
  :return: The output data dictionary
151
  :rtype: Dict[str, Any]
152
  """
153
-
 
154
  query_message = self._get_message(self.query_message_prompt_template, input_data)
 
155
  state_update_message = UpdateMessage_Generic(
156
  created_by=self.flow_config['name'],
157
  updated_flow=self.flow_config["name"],
@@ -162,4 +164,6 @@ class HumanStandardInputFlow(AtomicFlow):
162
  log.info(query_message)
163
  human_input = self._read_input()
164
 
165
- return {"human_input": human_input}
 
 
 
8
  from aiflows.messages import UpdateMessage_Generic
9
 
10
  from aiflows.utils import logging
11
+ from aiflows.messages import FlowMessage
12
  # logging.set_verbosity_debug() # ToDo: Has no effect on the logger for __name__. Level is warn, and info is not printed
13
  log = logging.get_logger(f"aiflows.{__name__}") # ToDo: Is there a better fix?
14
 
 
142
  return human_input
143
 
144
  def run(self,
145
+ input_message: FlowMessage):
146
  """ Runs the HumanStandardInputFlow. It's used to read input from the user/human's standard input.
147
 
148
  :param input_data: The input data dictionary
 
150
  :return: The output data dictionary
151
  :rtype: Dict[str, Any]
152
  """
153
+ input_data = input_message.data
154
+
155
  query_message = self._get_message(self.query_message_prompt_template, input_data)
156
+
157
  state_update_message = UpdateMessage_Generic(
158
  created_by=self.flow_config['name'],
159
  updated_flow=self.flow_config["name"],
 
164
  log.info(query_message)
165
  human_input = self._read_input()
166
 
167
+ reply_message = self._package_output_message(input_message = input_message, response = {"human_input": human_input})
168
+
169
+ self.reply_to_message(reply = reply_message, to = input_message)
demo.yaml CHANGED
@@ -1,17 +1,15 @@
1
- input_interface:
2
- _target_: aiflows.interfaces.KeyInterface
3
- flow:
4
- _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
5
- name: "HumanStandardInputFlow"
6
- description: "A demo of the HumanStandardInputFlow."
7
- request_multi_line_input_flag: True
8
- end_of_input_string: EOI
9
- query_message_prompt_template:
10
- _target_: aiflows.prompt_template.JinjaPrompt
11
- template: |2-
12
- Please enter your input.
13
 
14
- What's opinion on this statement?
15
- {{statement}}
16
- input_variables: []
17
- partial_variables: {"statement": "This Flow works well."}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
 
2
+ _target_: flow_modules.aiflows.HumanStandardInputFlowModule.HumanStandardInputFlow.instantiate_from_default_config
3
+ name: "HumanStandardInputFlow"
4
+ description: "A demo of the HumanStandardInputFlow."
5
+ request_multi_line_input_flag: True
6
+ end_of_input_string: EOI
7
+ query_message_prompt_template:
8
+ _target_: aiflows.prompt_template.JinjaPrompt
9
+ template: |2-
10
+ Please enter your input.
11
+
12
+ What's opinion on this statement?
13
+ {{statement}}
14
+ input_variables: [statement]
15
+ partial_variables: {}
run.py CHANGED
@@ -2,14 +2,22 @@ import os
2
 
3
  import hydra
4
 
5
-
6
  from aiflows.flow_launchers import FlowLauncher
 
7
  from aiflows.utils.general_helpers import read_yaml_file
8
 
9
  from aiflows import logging
10
- from aiflows.flow_cache import CACHING_PARAMETERS
 
 
 
 
 
 
 
11
 
12
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
13
  # clear_cache() # Uncomment this line to clear the cache
14
 
15
  logging.set_verbosity_debug()
@@ -21,38 +29,70 @@ from aiflows import flow_verse
21
  flow_verse.sync_dependencies(dependencies)
22
 
23
  if __name__ == "__main__":
24
-
 
 
 
 
 
 
25
  root_dir = "."
26
  cfg_path = os.path.join(root_dir, "demo.yaml")
27
  cfg = read_yaml_file(cfg_path)
28
- flow_with_interfaces = {
29
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
30
- "input_interface": (
31
- None
32
- if getattr(cfg, "input_interface", None) is None
33
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
34
- ),
35
- "output_interface": (
36
- None
37
- if getattr(cfg, "output_interface", None) is None
38
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
39
- ),
40
- }
41
-
42
- # ~~~ Get the data ~~~
43
- # This can be a list of samples
44
- data = {"id": 0} # Add your data here
45
-
46
- # ~~~ Run inference ~~~
47
- path_to_output_file = None
48
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
49
-
50
- _, outputs = FlowLauncher.launch(
51
- flow_with_interfaces=flow_with_interfaces,
 
 
 
 
 
 
52
  data=data,
53
- path_to_output_file=path_to_output_file,
54
  )
55
 
 
 
 
 
 
 
 
 
 
 
56
  # ~~~ Print the output ~~~
57
- flow_output_data = outputs[0]
58
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  import hydra
4
 
5
+ import aiflows
6
  from aiflows.flow_launchers import FlowLauncher
7
+ from aiflows.backends.api_info import ApiInfo
8
  from aiflows.utils.general_helpers import read_yaml_file
9
 
10
  from aiflows import logging
11
+ from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
12
+
13
+ from aiflows.utils import serve_utils
14
+ from aiflows.workers import run_dispatch_worker_thread
15
+ from aiflows.messages import FlowMessage
16
+ from aiflows.interfaces import KeyInterface
17
+ from aiflows.utils.colink_utils import start_colink_server
18
+ from aiflows.workers import run_dispatch_worker_thread
19
 
20
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
21
  # clear_cache() # Uncomment this line to clear the cache
22
 
23
  logging.set_verbosity_debug()
 
29
  flow_verse.sync_dependencies(dependencies)
30
 
31
  if __name__ == "__main__":
32
+
33
+ #1. ~~~~~ Set up a colink server ~~~~
34
+ FLOW_MODULES_PATH = "./"
35
+
36
+ cl = start_colink_server()
37
+
38
+ #2. ~~~~~Load flow config~~~~~~
39
  root_dir = "."
40
  cfg_path = os.path.join(root_dir, "demo.yaml")
41
  cfg = read_yaml_file(cfg_path)
42
+
43
+ #3. ~~~~ Serve The Flow ~~~~
44
+ serve_utils.serve_flow(
45
+ cl = cl,
46
+ flow_type="HumanStandardInputFlowModule",
47
+ default_config=cfg,
48
+ default_state=None,
49
+ default_dispatch_point="coflows_dispatch"
50
+ )
51
+
52
+ #4. ~~~~~Start A Worker Thread~~~~~
53
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
54
+
55
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
56
+ proxy_flow = serve_utils.recursive_mount(
57
+ cl=cl,
58
+ client_id="local",
59
+ flow_type="HumanStandardInputFlowModule",
60
+ config_overrides=None,
61
+ initial_state=None,
62
+ dispatch_point_override=None,
63
+ )
64
+
65
+
66
+ #6. ~~~ Get the data ~~~
67
+ data = {"id": 0, "statement": "This flow works well"} # This can be a list of samples
68
+ # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
69
+
70
+ #option1: use the FlowMessage class
71
+ input_message = FlowMessage(
72
  data=data,
 
73
  )
74
 
75
+ #option2: use the proxy_flow
76
+ #input_message = proxy_flow._package_input_message(data = data)
77
+
78
+ #7. ~~~ Run inference ~~~
79
+ future = proxy_flow.send_message_blocking(input_message)
80
+
81
+ #uncomment this line if you would like to get the full message back
82
+ #reply_message = future.get_message()
83
+ reply_data = future.get_data()
84
+
85
  # ~~~ Print the output ~~~
86
+ print("~~~~~~Reply~~~~~~")
87
+ print(reply_data)
88
+
89
+
90
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
91
+ # output_interface = KeyInterface(
92
+ # keys_to_rename={"api_output": "answer"},
93
+ # )
94
+ # print("Output: ", output_interface(reply_data))
95
+
96
+
97
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
98
+ # serve_utils.delete_served_flow(cl, "ReverseNumberAtomicFlow_served")