nbaldwin commited on
Commit
a5ead65
·
1 Parent(s): 80c0c76

coflow version

Browse files
Files changed (3) hide show
  1. LCToolFlow.py +10 -3
  2. demo.yaml +3 -4
  3. run.py +70 -29
LCToolFlow.py CHANGED
@@ -6,6 +6,7 @@ import hydra
6
  from langchain.tools import BaseTool
7
 
8
  from aiflows.base_flows import AtomicFlow
 
9
 
10
 
11
  class LCToolFlow(AtomicFlow):
@@ -81,7 +82,7 @@ class LCToolFlow(AtomicFlow):
81
  # ~~~ Instantiate flow ~~~
82
  return cls(**kwargs)
83
 
84
- def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
85
  """ This method runs the flow. It runs the backend on the input data.
86
 
87
  :param input_data: The input data of the flow.
@@ -89,7 +90,13 @@ class LCToolFlow(AtomicFlow):
89
  :return: The output data of the flow.
90
  :rtype: Dict[str, Any]
91
  """
 
92
  observation = self.backend.run(tool_input=input_data)
93
-
94
- return {"observation": observation}
 
 
 
 
 
95
 
 
6
  from langchain.tools import BaseTool
7
 
8
  from aiflows.base_flows import AtomicFlow
9
+ from aiflows.messages import FlowMessage
10
 
11
 
12
  class LCToolFlow(AtomicFlow):
 
82
  # ~~~ Instantiate flow ~~~
83
  return cls(**kwargs)
84
 
85
+ def run(self, input_message: FlowMessage):
86
  """ This method runs the flow. It runs the backend on the input data.
87
 
88
  :param input_data: The input data of the flow.
 
90
  :return: The output data of the flow.
91
  :rtype: Dict[str, Any]
92
  """
93
+ input_data = input_message.data
94
  observation = self.backend.run(tool_input=input_data)
95
+
96
+ reply = self._package_output_message(
97
+ input_message=input_message,
98
+ response = {"observation": observation}
99
+ )
100
+
101
+ self.reply_to_message(reply=reply, to=input_message)
102
 
demo.yaml CHANGED
@@ -1,4 +1,3 @@
1
- flow:
2
- _target_: flow_modules.aiflows.LCToolFlowModule.LCToolFlow.instantiate_from_default_config
3
- name: "demoLCToolFlow"
4
- description: "An example flow that uses the LCToolFlowModule."
 
1
+ _target_: flow_modules.aiflows.LCToolFlowModule.LCToolFlow.instantiate_from_default_config
2
+ name: "demoLCToolFlow"
3
+ description: "An example flow that uses the LCToolFlowModule."
 
run.py CHANGED
@@ -6,12 +6,20 @@ import hydra
6
 
7
  import aiflows
8
  from aiflows.flow_launchers import FlowLauncher
9
- from aiflows.utils.general_helpers import read_yaml_file
 
10
 
11
  from aiflows import logging
12
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
13
 
14
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
 
 
 
 
 
 
 
15
  # clear_cache() # Uncomment this line to clear the cache
16
 
17
  logging.set_verbosity_debug()
@@ -23,38 +31,71 @@ from aiflows import flow_verse
23
  flow_verse.sync_dependencies(dependencies)
24
 
25
  if __name__ == "__main__":
 
 
 
 
 
 
26
 
 
27
  root_dir = "."
28
  cfg_path = os.path.join(root_dir, "demo.yaml")
29
  cfg = read_yaml_file(cfg_path)
30
- flow_with_interfaces = {
31
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
32
- "input_interface": (
33
- None
34
- if getattr(cfg, "input_interface", None) is None
35
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
36
- ),
37
- "output_interface": (
38
- None
39
- if getattr(cfg, "output_interface", None) is None
40
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
41
- ),
42
- }
43
-
44
- # ~~~ Get the data ~~~
45
- # This can be a list of samples
46
- data = {"id": 0, "query": "Michael Jordan age"} # Add your data here
47
-
48
- # ~~~ Run inference ~~~
49
- path_to_output_file = None
50
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
51
-
52
- _, outputs = FlowLauncher.launch(
53
- flow_with_interfaces=flow_with_interfaces,
 
 
 
 
 
 
54
  data=data,
55
- path_to_output_file=path_to_output_file,
56
  )
57
 
 
 
 
 
 
 
 
 
 
 
58
  # ~~~ Print the output ~~~
59
- flow_output_data = outputs[0]
60
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
  import aiflows
8
  from aiflows.flow_launchers import FlowLauncher
9
+ from aiflows.backends.api_info import ApiInfo
10
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
11
 
12
  from aiflows import logging
13
  from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
 
15
+ from aiflows.utils import serve_utils
16
+ from aiflows.workers import run_dispatch_worker_thread
17
+ from aiflows.messages import FlowMessage
18
+ from aiflows.interfaces import KeyInterface
19
+ from aiflows.utils.colink_utils import start_colink_server
20
+ from aiflows.workers import run_dispatch_worker_thread
21
+
22
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
23
  # clear_cache() # Uncomment this line to clear the cache
24
 
25
  logging.set_verbosity_debug()
 
31
  flow_verse.sync_dependencies(dependencies)
32
 
33
  if __name__ == "__main__":
34
+
35
+ #1. ~~~~~ Set up a colink server ~~~~
36
+ FLOW_MODULES_PATH = "./"
37
+
38
+ cl = start_colink_server()
39
+
40
 
41
+ #2. ~~~~~Load flow config~~~~~~
42
  root_dir = "."
43
  cfg_path = os.path.join(root_dir, "demo.yaml")
44
  cfg = read_yaml_file(cfg_path)
45
+
46
+ #3. ~~~~ Serve The Flow ~~~~
47
+ serve_utils.serve_flow(
48
+ cl = cl,
49
+ flow_type="LCToolFlowModule",
50
+ default_config=cfg,
51
+ default_state=None,
52
+ default_dispatch_point="coflows_dispatch"
53
+ )
54
+
55
+ #4. ~~~~~Start A Worker Thread~~~~~
56
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
57
+
58
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
59
+ proxy_flow = serve_utils.recursive_mount(
60
+ cl=cl,
61
+ client_id="local",
62
+ flow_type="LCToolFlowModule",
63
+ config_overrides=None,
64
+ initial_state=None,
65
+ dispatch_point_override=None,
66
+ )
67
+
68
+
69
+ #6. ~~~ Get the data ~~~
70
+ data = {"id": 0, "query": "Obama's first name?"} # Add your data here
71
+ # data = {"id": 0, "question": "Who was the NBA champion in 2023?"} # This can be a list of samples
72
+
73
+ #option1: use the FlowMessage class
74
+ input_message = FlowMessage(
75
  data=data,
 
76
  )
77
 
78
+ #option2: use the proxy_flow
79
+ #input_message = proxy_flow._package_input_message(data = data)
80
+
81
+ #7. ~~~ Run inference ~~~
82
+ future = proxy_flow.send_message_blocking(input_message)
83
+
84
+ #uncomment this line if you would like to get the full message back
85
+ #reply_message = future.get_message()
86
+ reply_data = future.get_data()
87
+
88
  # ~~~ Print the output ~~~
89
+ print("~~~~~~Reply~~~~~~")
90
+ print(reply_data)
91
+
92
+
93
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
94
+ # output_interface = KeyInterface(
95
+ # keys_to_rename={"api_output": "answer"},
96
+ # )
97
+ # print("Output: ", output_interface(reply_data))
98
+
99
+
100
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
101
+ # serve_utils.delete_served_flow(cl, "ReverseNumberAtomicFlow_served")