File size: 13,748 Bytes
29f7f08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from typing import List
import json


TOOL_SYSTEM_PROMPT_RUBRA = (
    "You have access to the following tools: {tool_text}\n"
    "You can choose to respond with one or more tool calls at once, or with a chat message back to the user. "
    "Ensure you have all necessary details before making tool calls. If additional information is needed, "
    "ask the user appropriately. Any tool call you make must correspond to the functions listed above.\n"
    "If you decide to call a tool, format it like this: "
    'starttoolcall{{"name": "<function_name>", "arguments": {{"<arg1_name>": "<arg1_value>", "<arg2_name>": "<arg2_value>", ...}}}}endtoolcall '
    "where the JSON wrapped between starttoolcall and endtoolcall represents the function call.\n"
)

def json_schema_to_typescript_type(schema, param_name):
    ts_type = "any"  # default type
    enum_comment = ""
    integer_comment = ""
    description_comment = ""

    if isinstance(schema, dict) and "type" in schema:
        json_type = schema["type"]
        if json_type == "array":
            item_type = (
                "any"
                if "items" not in schema
                else json_schema_to_typescript_type(schema["items"], param_name)[0]
            )
            ts_type = f"{item_type}[]"
        elif json_type == "number":
            ts_type = "number"
        elif json_type == "integer":
            ts_type = (
                "number"  # TypeScript doesn't differentiate between number and integer
            )
            integer_comment = f" * @param {param_name} - Integer"
        elif json_type == "object":
            ts_type, _ = generate_typescript_interface(schema, param_name)
        elif json_type == "boolean":
            ts_type = "boolean"
        elif json_type == "null":
            ts_type = "null"
        elif json_type == "string":
            ts_type = "string"

    if "enum" in schema:
        enum_comment = f" * @enum {param_name} - Possible values: " + ", ".join(
            [f'"{enum_value}"' for enum_value in schema["enum"]]
        )
        ts_type = "string"
    if "description" in schema:
        description_comment = f' * @param {param_name} - {schema["description"]}'

    # Return only the type for nested objects to avoid duplicating comments
    if isinstance(schema, dict) and schema.get("type") == "object":
        return ts_type, "", "", ""

    return ts_type, enum_comment, integer_comment, description_comment


def generate_typescript_interface(schema, interface_name):
    properties = schema.get("properties", {})
    required = schema.get("required", [])

    interface_body = []
    descriptions = []
    for prop_name, prop_schema in properties.items():
        prop_type, enum_comment, integer_comment, description_comment = (
            json_schema_to_typescript_type(prop_schema, prop_name)
        )
        is_optional = prop_name not in required
        interface_body.append(
            f'    {prop_name}{"?" if is_optional else ""}: {prop_type};'
        )
        if description_comment:
            descriptions.append(description_comment)
        if enum_comment:
            descriptions.append(enum_comment)
        if integer_comment:
            descriptions.append(integer_comment)

    comments = "\n".join(descriptions)
    interface_definition = (
        f"interface {interface_name} {{\n" + "\n".join(interface_body) + "\n}"
    )
    return interface_definition, comments


def convert_parameters_list_to_dict(parameters):
    properties = {}
    required = []
    for param in parameters:
        properties[param["name"]] = param
        if "default" not in param:
            required.append(param["name"])
    return {"properties": properties, "required": required}


def generate_typescript_function(function_schema) -> str:
    func_name = function_schema["name"]
    description = function_schema.get("description", "")

    # Check if parameters is a list and convert if necessary
    parameters_info = function_schema.get("parameters", {})
    if isinstance(parameters_info, list):
        parameters_info = convert_parameters_list_to_dict(parameters_info)
    if parameters_info is None:
        parameters_info = {}

    parameters_schema = parameters_info.get("properties", {})
    required_params = parameters_info.get("required", [])

    args_list = []
    comments_list = []
    interfaces = []
    for param_name, param_schema in parameters_schema.items():
        ts_type, enum_comment, integer_comment, description_comment = (
            json_schema_to_typescript_type(param_schema, param_name)
        )
        if ts_type.startswith("interface"):
            interface_definition, nested_comments = generate_typescript_interface(
                param_schema, f"{func_name}_{param_name.capitalize()}Params"
            )
            interfaces.append(interface_definition)
            comments_list.append(nested_comments)
            ts_type = f"{func_name}_{param_name.capitalize()}Params"
        else:
            if description_comment:
                comments_list.append(description_comment)
            if enum_comment:
                comments_list.append(enum_comment)
            if integer_comment:
                comments_list.append(integer_comment)
        is_optional = param_name not in required_params
        args_list.append(f'{param_name}{"?" if is_optional else ""}: {ts_type}')

    args_str = ", ".join(args_list)
    comments_str = "\n".join(comments_list)
    interfaces_str = "\n\n".join(interfaces)

    description_comment = f" * {description}\n" if description else ""
    typescript_func_declaration = (
        "/**\n"
        + description_comment
        + (comments_str + "\n" if comments_str else "")
        + " */\n"
        + (interfaces_str + "\n\n" if interfaces_str else "")
        + f"function {func_name}({args_str}): any {{}}"
    )

    return typescript_func_declaration



def format_tools(tools: List[dict]) -> str:
    func_defs = []
    for t in tools:
        tool_schema = t["function"] if "function" in t else t
        func_defs.append(generate_typescript_function(tool_schema))
    
    typescript_functions_str = "\n\n".join(func_defs)
    res = TOOL_SYSTEM_PROMPT_RUBRA.format(tool_text=typescript_functions_str)
    return res



def preprocess_input(msgs: List[dict], tools: List[dict]):
    tool_system_prompt = format_tools(tools)
    processed_msgs = process_messages(msgs, tool_system_prompt)
    return processed_msgs


def process_messages(messages: List[dict], function_str: str):
    func_observation_map = {}
    processed_msg = []

    for i in range(len(messages)):
        
        if messages[i]["role"] != "tool" and len(func_observation_map) > 0:
            # func_observation_array = [f'{k}: {func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map]
            func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map]
            observation_str = json.dumps(func_observation_array)
            observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"}
            processed_msg.append(observation_call)
            func_observation_map.clear()

        if i == 0:
            if messages[0]["role"] == "system":
                old_content = messages[0]["content"]
                sys_msg = {"role": "system", "content": old_content + "\n" + function_str}
                processed_msg.append(sys_msg)
            else:
                # Insert a system message of tool definition before the first message
                sys_msg = {"role": "system", "content": "You are a helpful assistant.\n" + function_str}
                processed_msg.append(sys_msg)
                processed_msg.append(messages[0]) # first message is always either system or user msg

        elif messages[i]["role"] == "assistant" and "tool_calls" in messages[i]:
            # Convert OpenAI function call format to Rubra format
            tool_call_str = construct_tool_call_str(messages[i]["tool_calls"], func_observation_map)
            function_call = {"role": "assistant", "content": tool_call_str}
            processed_msg.append(function_call)

        elif messages[i]["role"] == "tool":
            tool_call_id = messages[i]["tool_call_id"]
            if tool_call_id in func_observation_map:
                func_observation_map[tool_call_id] = messages[i]["content"]
            else:
                print(func_observation_map)
                print(f"Tool call id not found in the map: {tool_call_id}")
                # TODO: the input is not valid in this case, should return an error

        else:
            processed_msg.append(messages[i])
        

    if len(func_observation_map) > 0:
        # func_observation_array = [f'{k}: {func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map]
        func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map]
        observation_str = json.dumps(func_observation_array)
        observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"}
        processed_msg.append(observation_call)
        func_observation_map.clear()

    return processed_msg


def construct_tool_call_str(tool_calls, func_observation_map) -> str:
    tool_list = []
    for tool_call in tool_calls:
        tool_call_id = tool_call["id"]
        func_observation_map[tool_call_id] = ""  # Initialize with empty value, updated later from the message with tool role
        
        if type(tool_call["function"]["arguments"]) == str:
            tool_call["function"]["arguments"] = json.loads(tool_call["function"]["arguments"])
        tool_list.append("starttoolcall"+str(tool_call["function"]) + "endtoolcall")

    # Converting the Python dictionary to a YAML formatted string
    tool_call_str = "".join(tool_list)
    return tool_call_str


if __name__ == "__main__":
    tools = [{
            "type": "function",
            "function": {
                "name": "dummy",
                "description": "just to say hi",
                "parameters": None,
            }
        },{"type": "function","function":{"name":"calculate_distance","description":"Calculate the distance between two locations","parameters":{"type":"object","properties":{"origin":{"type":"string","description":"The starting location"},"destination":{"type":"string","description":"The destination location"},"mode":{"type":"string","description":"The mode of transportation"}},"required":["origin","destination","mode"]}}},{"type": "function","function":{"name":"generate_password","description":"Generate a random password","parameters":{"type":"object","properties":{"length":{"type":"integer","description":"The length of the password"}},"required":["length"]}}}]
    # msgs = [{'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': 'What is the distance between San Francisco and Cupertino by driving and by air from both directions?'}, {'role': 'assistant', 'tool_calls': [{'id': '0', 'function': {'name': 'calculate_distance', 'arguments': '{"origin":"San Francisco","destination":"Cupertino","mode":"drive"}'}, 'type': 'function'},{'id': '1', 'function': {'name': 'calculate_distance', 'arguments': '{"origin":"San Francisco","destination":"Cupertino","mode":"air"}'}, 'type': 'function'}]}, {'role': 'tool', 'tool_call_id': '0', 'name': 'calculate_distance', 'content': 'Distance is 50 miles.'}, {'role': 'tool', 'tool_call_id': '1', 'name': 'calculate_distance', 'content': ''}]
    msgs = [{'role': 'user', 'content': "\nYou are task oriented system.\nYou receive input from a user, process the input from the given instructions, and then output the result.\nYour objective is to provide consistent and correct results.\nYou do not need to explain the steps taken, only provide the result to the given instructions.\nYou are referred to as a tool.\nYou don't move to the next step until you have a result.\n\nDownload https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip to a\nrandom file. Then expand the archive to a temporary location as there is a sqlite\ndatabase in it.\n\nFirst inspect the schema of the database to understand the table structure.\n\nForm and run a SQL query to find the artist with the most number of albums and output\nthe result of that.\n\nWhen done remove the database file and the downloaded content."}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'cdf7eb2e', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download2815959664.zip', 'name': 'download', 'tool_call_id': 'cdf7eb2e'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': '07cdee86', 'type': 'function', 'function': {'name': 'exec', 'arguments': '{"command":"unzip /tmp/gpt-download2815959664.zip","directory":"."}'}}]}, {'role': 'tool', 'content': 'Archive:  /tmp/gpt-download2815959664.zip\n  inflating: chinook.db              \n', 'name': 'exec', 'tool_call_id': '07cdee86'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'e8042027', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download3736408077.zip', 'name': 'download', 'tool_call_id': 'e8042027'}]
    new_msgs = preprocess_input(msgs, tools)
    print(json.dumps(new_msgs, indent=2))