File size: 8,071 Bytes
09321b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import copy
from typing import Union

from ..llm.base import LLM

from .raw_prompt_builder import build_raw_prompt

KNOWLEDGE_PROMPT = '# 知识库'
KNOWLEDGE_INTRODUCTION_PROMPT = '以下是我上传的文件“<file_name>”的内容:'
KNOWLEDGE_CONTENT_PROMPT = """```
<knowledge_content>
```"""

DEFAULT_PROMPT_INPUT_LENGTH_MAX = 999999999999


class LengthConstraint:

    def __init__(self):
        self.knowledge = DEFAULT_PROMPT_INPUT_LENGTH_MAX
        self.input = DEFAULT_PROMPT_INPUT_LENGTH_MAX
        self.prompt_max_length = 10000

    def update(self, config: dict):
        if config is not None:
            self.knowledge = config.get('knowledge', self.knowledge)
            self.input = config.get('input', self.input)
            self.prompt_max_length = config.get('prompt_max_length',
                                                self.prompt_max_length)


class PromptGenerator:

    def __init__(self,
                 system_template: str = '',
                 instruction_template: str = '',
                 user_template: str = '<user_input>',
                 exec_template: str = '',
                 assistant_template: str = '',
                 sep='\n\n',
                 llm=None,
                 length_constraint=LengthConstraint()):
        """
        prompt genertor
        Args:
            system_template (str, optional): System template, normally the role of LLM.
            instruction_template (str, optional): Indicate the instruction for LLM.
            user_template (str, optional): Prefix before user input. Defaults to ''.
            exec_template (str, optional): A wrapper str for exec result.
            assistant_template (str, optional): Prefix before assistant response.
            Some LLM need to manully concat this prefix before generation.
            sep (str, optional): content separator
            length_constraint (LengthConstraint, optional): content length constraint
        """

        self.system_template = system_template
        self.instruction_template = instruction_template
        self.user_template = user_template
        self.assistant_template = assistant_template
        self.exec_template = exec_template
        self.sep = sep
        if isinstance(llm, LLM) and llm.model_id:
            self.prompt_preprocessor = build_raw_prompt(llm.model_id)
        self.prompt_max_length = length_constraint.prompt_max_length
        self.reset()

    def reset(self):
        self.prompt = ''
        self.history = []
        self.messages = []

    def init_prompt(self,
                    task,
                    tool_list,
                    knowledge_list,
                    llm_model=None,
                    **kwargs):
        """
        in this function, the prompt will be initialized.
        """
        prompt = self.sep.join(
            [self.system_template, self.instruction_template])
        prompt += '<knowledge><history>'

        knowledge_str = self.get_knowledge_str(
            knowledge_list, file_name=kwargs.get('file_name', ''))

        # knowledge
        prompt = prompt.replace('<knowledge>', knowledge_str)

        # get tool description str
        tool_str = self.get_tool_str(tool_list)
        prompt = prompt.replace('<tool_list>', tool_str)

        history_str = self.get_history_str()

        prompt = prompt.replace('<history>', history_str)

        self.system_prompt = copy.deepcopy(prompt)

        # user input
        user_input = self.user_template.replace('<user_input>', task)
        prompt += f'{self.sep}{user_input}'

        # assistant input
        prompt += f'{self.sep}{self.assistant_template}'

        # store history
        self.history.append({'role': 'user', 'content': user_input})
        self.history.append({
            'role': 'assistant',
            'content': self.assistant_template
        })

        self.prompt = prompt

        self.function_calls = self.get_function_list(tool_list)

    # TODO change the output from single prompt to artifacts including prompt, messages, funciton_call
    def generate(self, llm_result, exec_result: Union[str, dict]):
        if isinstance(exec_result, dict):
            exec_result = str(exec_result['result'])
        return self._generate(llm_result, exec_result)

    def _generate(self, llm_result, exec_result: str):
        """
        generate next round prompt based on previous llm_result and exec_result and update history
        """
        if len(llm_result) != 0:
            self.prompt = f'{self.prompt}{llm_result}'
            self.history[-1]['content'] += f'{llm_result}'
        if len(exec_result) != 0:
            exec_result = self.exec_template.replace('<exec_result>',
                                                     str(exec_result))
            self.prompt = f'{self.prompt}{self.sep}{exec_result}'
            self.history[-1]['content'] += f'{self.sep}{exec_result}'

        return self.prompt

    # TODO: add Union[Text, Message] type for llm_result,
    #  add ExecResult = Text type for exec_result
    #  output would be a Union[Text, Messages]
    # In this case llm_result is Message, and exec_result is Function_call
    def _generate_messages(self, llm_result, exec_result: str):
        """
        generate next round prompt based on previous llm_result and exec_result and update history
        """

        # init task should be
        if llm_result == '' and exec_result == '':
            return self.history

        # make sure set content  ''  not null
        function_call = llm_result.get('function_call', None)
        if function_call is not None:
            llm_result['content'] = ''
        self.history.append(llm_result)

        if exec_result is not None and function_call is not None:
            exec_message = {
                'role': 'function',
                'name': 'execute',
                'content': exec_result,
            }
            self.history.append(exec_message)

        return self.history

    def get_tool_str(self, tool_list):
        """generate tool list string

        Args:
            tool_list (List[str]): list of tools

        """

        tool_str = self.sep.join(
            [f'{i + 1}. {t}' for i, t in enumerate(tool_list)])
        return tool_str

    # TODO move parse_tools_to_function from agent to here later
    def get_function_list(self, tool_list):
        """generate funciton call list from tools list

        Args:
            tool_list (List[str]): list of tools

        """
        functions = [tool.get_function() for tool in tool_list]
        return functions

    def get_knowledge_str(self,
                          knowledge_list,
                          file_name='',
                          only_content=False,
                          **kwargs):
        """generate knowledge string

        Args:
            file_name (str): file name
            knowledge_list (List[str]): list of knowledges

        """

        knowledge = self.sep.join(
            [f'{i + 1}. {k}' for i, k in enumerate(knowledge_list)])
        knowledge_content = KNOWLEDGE_CONTENT_PROMPT.replace(
            '<knowledge_content>', knowledge)
        if only_content:
            return knowledge_content
        else:
            knowledge_introduction = KNOWLEDGE_INTRODUCTION_PROMPT.replace(
                '<file_name>', file_name)

            knowledge_str = f'{KNOWLEDGE_PROMPT}{self.sep}{knowledge_introduction}{self.sep}{knowledge_content}' if len(
                knowledge_list) > 0 else ''
        return knowledge_str

    def get_history_str(self):
        """generate history string

        """
        history_str = ''
        for i in range(len(self.history)):
            history_item = self.history[len(self.history) - i - 1]
            text = history_item['content']
            if len(history_str) + len(text) + len(
                    self.prompt) > self.prompt_max_length:
                break
            history_str = f'{self.sep}{text.strip()}{history_str}'

        return history_str