"""A simple script to run a Flow that can be used for development and debugging.""" import os import hydra import aiflows from aiflows.backends.api_info import ApiInfo from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys from aiflows import logging from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache from aiflows.utils import serving from aiflows.workers import run_dispatch_worker_thread from aiflows.messages import FlowMessage from aiflows.interfaces import KeyInterface from aiflows.utils.colink_utils import start_colink_server from aiflows.workers import run_dispatch_worker_thread CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching # clear_cache() # Uncomment this line to clear the cache dependencies = [ {"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()} ] from aiflows import flow_verse flow_verse.sync_dependencies(dependencies) if __name__ == "__main__": #1. ~~~~~ Set up a colink server ~~~~ cl = start_colink_server() #2. ~~~~~Load flow config~~~~~~ root_dir = "." cfg_path = os.path.join(root_dir, "demo.yaml") cfg = read_yaml_file(cfg_path) #2.1 ~~~ Set the API information ~~~ # OpenAI backend api_information = [ApiInfo(backend_used="openai", api_key = os.getenv("OPENAI_API_KEY"))] # # Azure backend # api_information = ApiInfo(backend_used = "azure", # api_base = os.getenv("AZURE_API_BASE"), # api_key = os.getenv("AZURE_OPENAI_KEY"), # api_version = os.getenv("AZURE_API_VERSION") ) quick_load_api_keys(cfg, api_information, key="api_infos") #3. ~~~~ Serve The Flow ~~~~ serving.serve_flow( cl = cl, flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.ChromaDBFlow", flow_endpoint="ChromaDBFlow", ) #4. ~~~~~Start A Worker Thread~~~~~ run_dispatch_worker_thread(cl) #5 ~~~~~Mount the flow and get its proxy~~~~~~ proxy_flow_cdb= serving.get_flow_instance( cl=cl, flow_endpoint="ChromaDBFlow", user_id="local", config_overrides = cfg["chroma_demo_flow"] ) #3.(2) ~~~~ Serve The Flow ~~~~ serving.serve_flow( cl = cl, flow_class_name="flow_modules.aiflows.VectorStoreFlowModule.VectorStoreFlow", flow_endpoint="VectorStoreFlow", ) #4.(2) ~~~~~Start A Worker Thread~~~~~ run_dispatch_worker_thread(cl) #5.(2) ~~~~~Mount the flow and get its proxy~~~~~~ proxy_flow_vs= serving.get_flow_instance( cl=cl, flow_endpoint="VectorStoreFlow", user_id="local", config_overrides = cfg["vector_store_demo_flow"], ) #6. ~~~ Get the data ~~~ data_write = {"id": 1, "operation": "write", "content": "The Capital of Switzerland is Bern"} data_read1 = {"id": 1, "operation": "read", "content": "Switzerland"} data_read2 = {"id": 3, "operation": "read", "content": "What did the author do growing up?"} # Add your data here # Add your data here data = [data_read2,data_write,data_read1] #option1: use the FlowMessage class futures = [] #7. ~~~ Run inference ~~~ print("##########CHROMA DB DEMO###############") for dp in data: input_message = FlowMessage( data=data_write, ) futures.append(proxy_flow_cdb.get_reply_future(input_message)) replies = [ft.get_data() for ft in futures] for dp,rp in zip(data, replies): print("~~~~~ Message Sent~~~~~") print(dp) print("~~~~~ Replies ~~~~~") print(rp) #7. ~~~ Run inference ~~~ print("##########VECTOR STORE DEMO##############") for dp in data: input_message = FlowMessage( data=data_write, ) futures.append(proxy_flow_vs.get_reply_future(input_message)) replies = [ft.get_data() for ft in futures] for dp,rp in zip(data, replies): print("~~~~~ Message Sent~~~~~") print(dp) print("~~~~~ Replies ~~~~~") print(rp) #8. ~~~~ (Optional) apply output interface on reply ~~~~ # output_interface = KeyInterface( # keys_to_rename={"api_output": "answer"}, # ) # print("Output: ", output_interface(reply_data)) #9. ~~~~~Optional: Unserve Flow~~~~~~ # serving.delete_served_flow(cl, "FlowModule")