|
import json |
|
from copy import deepcopy |
|
from typing import Any, Dict, List |
|
from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow |
|
|
|
from dataclasses import dataclass |
|
class PlanGeneratorAtomicFlow(ChatAtomicFlow): |
|
def __init__( |
|
self, |
|
code_file_location: str, |
|
**kwargs): |
|
super().__init__(**kwargs) |
|
self.hint_for_model = """ |
|
Make sure your response is in the following format: |
|
Response Format: |
|
{ |
|
"plan": "Python printable string of the plan corresponding to the goal", |
|
} |
|
""" |
|
self.code_file_location = code_file_location |
|
self.original_system_template = self.system_message_prompt_template.template |
|
|
|
@classmethod |
|
def instantiate_from_config(cls, config): |
|
flow_config = deepcopy(config) |
|
|
|
kwargs = {"flow_config": flow_config} |
|
|
|
|
|
kwargs.update(cls._set_up_prompts(flow_config)) |
|
kwargs.update(cls._set_up_backend(flow_config)) |
|
|
|
|
|
code_file_location = flow_config["code_file_location"] |
|
kwargs.update({"code_file_location": code_file_location}) |
|
|
|
|
|
return cls(**kwargs) |
|
|
|
def _get_library_function_signatures(self): |
|
import importlib.util |
|
try: |
|
spec = importlib.util.spec_from_file_location("code_library", self.code_file_location) |
|
module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(module) |
|
ret = '' |
|
import inspect |
|
for name, obj in inspect.getmembers(module): |
|
if inspect.isfunction(obj): |
|
ret += f"{name}: {inspect.signature(obj)}\n" |
|
return ret |
|
except FileNotFoundError: |
|
return 'There is no function available yet.' |
|
|
|
def _update_prompt_and_input(self, input_data: Dict[str, Any]): |
|
if 'goal' in input_data: |
|
input_data['goal'] += self.hint_for_model |
|
|
|
function_signatures_to_append = self._get_library_function_signatures() |
|
self.system_message_prompt_template.template = \ |
|
self.original_system_template + "\n\n" + f"Here are the available functions at {self.code_file_location}\n" \ |
|
+ function_signatures_to_append + "\n" |
|
|
|
def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
|
self._update_prompt_and_input(input_data) |
|
api_output = super().run(input_data)["api_output"].strip() |
|
try: |
|
response = json.loads(api_output) |
|
return response |
|
except json.decoder.JSONDecodeError: |
|
new_input_data = input_data.copy() |
|
new_input_data['goal'] += ("The previous respond cannot be parsed with json.loads, Make sure your next " |
|
"response is solely in JSON format.") |
|
new_api_output = super().run(new_input_data)["api_output"].strip() |
|
return json.loads(new_api_output) |
|
|
|
|
|
|