import json from copy import deepcopy from typing import Any, Dict, List from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow from dataclasses import dataclass class PlanGeneratorAtomicFlow(ChatAtomicFlow): def __init__( self, code_file_location: str, **kwargs): super().__init__(**kwargs) self.hint_for_model = """ Make sure your response is in the following format: Response Format: { "plan": "Python printable string of the plan corresponding to the goal", } """ self.code_file_location = code_file_location self.original_system_template = self.system_message_prompt_template.template @classmethod def instantiate_from_config(cls, config): flow_config = deepcopy(config) kwargs = {"flow_config": flow_config} # ~~~ Set up prompts ~~~ kwargs.update(cls._set_up_prompts(flow_config)) kwargs.update(cls._set_up_backend(flow_config)) # ~~~ Set up code file location code_file_location = flow_config["code_file_location"] kwargs.update({"code_file_location": code_file_location}) # ~~~ Instantiate flow ~~~ return cls(**kwargs) def _get_library_function_signatures(self): import importlib.util try: spec = importlib.util.spec_from_file_location("code_library", self.code_file_location) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) ret = '' import inspect for name, obj in inspect.getmembers(module): if inspect.isfunction(obj): ret += f"{name}: {inspect.signature(obj)}\n" return ret except FileNotFoundError: return 'There is no function available yet.' def _update_prompt_and_input(self, input_data: Dict[str, Any]): if 'goal' in input_data: input_data['goal'] += self.hint_for_model function_signatures_to_append = self._get_library_function_signatures() self.system_message_prompt_template.template = \ self.original_system_template + "\n\n" + f"Here are the available functions at {self.code_file_location}\n" \ + function_signatures_to_append + "\n" def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: self._update_prompt_and_input(input_data) api_output = super().run(input_data)["api_output"].strip() try: response = json.loads(api_output) return response except json.decoder.JSONDecodeError: new_input_data = input_data.copy() new_input_data['goal'] += ("The previous respond cannot be parsed with json.loads, Make sure your next " "response is solely in JSON format.") new_api_output = super().run(new_input_data)["api_output"].strip() return json.loads(new_api_output)