Spaces:
Sleeping
Sleeping
# important classes | |
import json | |
import hnswlib | |
import numpy as np | |
from openai import OpenAI | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
import streamlit as st | |
# It formats the output by printing it in color. | |
# Defined arguments | |
API_KEY=st.secrets["API_KEY"] | |
model_name_tr = "gpt-3.5-turbo" # model name | |
model_name_ta = "gpt-4-1106-preview" | |
tool_list_path = './tools.json' # list of tools path[ ] | |
example_path = './examples.json' # list of examples path | |
zero_shot = 0 | |
no_of_examples = 2 | |
# ---------------------------------------------------some constants ---------------------------------------------------------------------- | |
EF = 100 # EF | |
K = 3 # top k number | |
COSINE_THRESHOLD = 0.3 # cosine threshold | |
biencoder = SentenceTransformer("BAAI/bge-large-en-v1.5", device="cpu") | |
cross_encoder = CrossEncoder( | |
"cross-encoder/ms-marco-MiniLM-L-12-v2", max_length=512, device="cpu" | |
) | |
client = OpenAI(api_key=st.secrets["API_KEY"], timeout=60, max_retries=2) | |
class color: | |
PURPLE = "\033[1;35;48m" | |
CYAN = "\033[1;36;48m" | |
BOLD = "\033[1;37;48m" | |
BLUE = "\033[1;34;48m" | |
GREEN = "\033[1;32;48m" | |
YELLOW = "\033[1;33;48m" | |
RED = "\033[1;31;48m" | |
BLACK = "\033[1;30;48m" | |
UNDERLINE = "\033[4;37;48m" | |
END = "\033[1;37;0m" | |
class Tools: | |
''' | |
A class to represent tools. | |
... | |
Attributes | |
---------- | |
- tools: list | |
a list of tools (json format as described above) | |
- examples: list | |
a list of examples (json format as described above) | |
Methods | |
------- | |
- check_json() | |
checks whether the tool added is in the defined JSON schema | |
- build_index() | |
assigns index values to the examples and their respective tool calls to create an index list. | |
- add_tool() | |
adds new tool to the index list. | |
- add_example() | |
adds the new example in the example pool and modifies the index list. | |
- modify_example() | |
modifies an existing example present in the example list. | |
- update_tool() | |
updates an exisiting tool in the tool list. | |
- replenish_examples() | |
on deletion of tool replenish the examples for the tools where the num_examples < threshold | |
- delete_tool() | |
deletes the tool from the index list and example list. | |
- similarity_retriever() | |
creates the hsnw index for the example queries | |
''' | |
def __init__(self, tools, examples): | |
# assuming the self.tools to be a dict of tools with tool name as the key and tool info as the value. | |
self.tools = tools | |
self.examples = examples | |
self.index = {} | |
self.build_index() | |
self.th = 2 | |
self.query_embeddings = [] | |
self.queries = [] | |
self.search_index = None | |
self.similarity_retriever() | |
def check_json(self,tool_json): | |
keys = ['argument_list', 'title', 'tool_description', 'tool_name'] | |
argument_keys = ['argument_description', 'argument_name', 'argument_type', 'example'] | |
if type(tool_json) != type({}): | |
raise Exception('Given tool json is not a dictionary') | |
if keys != sorted(list(tool_json.keys())): | |
raise Exception( | |
"""Keys don't match | |
Expected Keys: {} | |
Given Keys: {} | |
""".format(keys, sorted(list(tool_json.keys()))) | |
) | |
if type(tool_json['argument_list']) != type([]): | |
raise Exception('Given argument list is not a list') | |
for idx, arg in enumerate(tool_json['argument_list']): | |
if type(arg) != type({}): | |
raise Exception(f'Argument at the index: {idx} is not a dictionary') | |
if argument_keys != sorted(list(arg.keys())): | |
raise Exception( | |
"""Keys don't match at {} | |
Expected Keys: {} | |
Given Keys: {} | |
""".format(idx, argument_keys, sorted(list(arg.keys()))) | |
) | |
def build_index(self): | |
''' | |
Indexes all the examples with their respective tool calls. | |
For example: {'<tool_name>': {'num_examples': <no. of examples containing tool_name>, 'indices': <list of example indices containing respective tool_name>}} | |
Parameters | |
---------- | |
None | |
Returns | |
---------- | |
None | |
Modifies | |
---------- | |
self.index | |
''' | |
for tool in self.tools: | |
self.index[tool] = {'num_examples': 0, 'indices': []} | |
for i, example in enumerate(self.examples): | |
for tool in example['answer']: | |
self.index[tool['tool_name']]['num_examples'] += 1 | |
self.index[tool['tool_name']]['indices'].append(i) | |
def add_tool(self, tool): | |
''' | |
On addition of any new tool, this function gets called to add the new tool in self.index. | |
Parameters | |
---------- | |
- tool : json (format described above) | |
Returns | |
---------- | |
None | |
Modifies | |
---------- | |
self.index | |
''' | |
# self.check_json(tool) | |
tool_name = tool['tool_name'] | |
if tool_name in self.tools.keys(): | |
if tool==self.tools[tool_name]: | |
print(color.YELLOW+'Already Exists!'+color.END) | |
else: | |
self.update_tool(tool) | |
print(color.YELLOW+f"[WARNING] You tried adding a tool that already exists, so updating the tool '{tool_name}'"+color.END) | |
else: | |
self.tools[tool_name] = tool | |
self.index[tool_name] = {'num_examples': 0, 'indices': []} | |
self.add_example(tool, self.th) | |
print(color.GREEN+f"[SUCCESS] Added the tool '{tool_name}'"+color.END) | |
def add_example(self, tool, no_of_eg): | |
''' | |
On addition of new tool or replenishing examples on deletion, this function gets called to add the new example in self.examples and modifies index list accordingly. | |
Parameters | |
---------- | |
- tool : json (format described above) | |
- no_of_eg : int | |
Returns | |
---------- | |
None | |
Modifies | |
---------- | |
self.examples | |
self.index | |
''' | |
tool_name = tool['tool_name'] | |
try: | |
ex = self.examples[:3] | |
except: | |
ex = self.examples | |
message = create_prompt_for_new_example(list(self.tools.values()), tool, ex, n=no_of_eg) | |
res = client_conn(message, model_name_tr) | |
res = res.choices[0].message.content | |
try: | |
new_examples = json.loads(res) | |
except: | |
new_examples = get_parsed_json(res) | |
for example in new_examples: | |
tool_calls = example['answer'] | |
for tool in tool_calls: | |
new_tool_name = tool['tool_name'] | |
self.index[new_tool_name]['num_examples'] += 1 | |
self.index[new_tool_name]['indices'].append(len(self.examples)) | |
self.examples.append(example) | |
self.similarity_retriever() | |
def modify_example(self, tool): | |
''' | |
On modification of a tool, this function gets called to modify the examples where the tool is used. | |
Parameters | |
---------- | |
None | |
Returns | |
---------- | |
None | |
Modifies | |
---------- | |
self.examples | |
''' | |
tool_name = tool["tool_name"] | |
indices = self.index[tool_name]["indices"] | |
relevant_examples = [] | |
for ind in indices: | |
relevant_examples.append(self.examples[ind]) | |
message = create_prompt_for_modified_example( | |
list(self.tools.values()), tool, relevant_examples | |
) | |
res = client_conn(message, model_name_tr) | |
res = res.choices[0].message.content | |
try: | |
examples = json.loads(res) | |
except: | |
examples = get_parsed_json(res) | |
for i in range(len(examples)): | |
self.examples[indices[i]] = examples[i] | |
print(color.GREEN + f"[SUCCESS] Modified Examples!" + color.END) | |
self.similarity_retriever() | |
def update_tool(self, tool): | |
''' | |
On modification of a tool, this function gets called update the tool. | |
Parameters | |
---------- | |
tool : json (format described above) | |
Returns | |
------- | |
None | |
Modifies | |
-------- | |
self.tools | |
''' | |
# self.check_json(tool) | |
tool_name = tool['tool_name'] | |
if tool_name in self.tools.keys(): | |
self.modify_example(tool) | |
self.tools[tool_name] = tool | |
print(color.GREEN+f"[SUCCESS] Updated the tool '{tool_name}'"+color.END) | |
else: | |
print(color.YELLOW+f"[WARNING] You tried updating a tool that does not exist, so added the tool '{tool_name}'"+color.END) | |
self.add_tool(tool) | |
def replenish_examples(self): | |
''' | |
On deletion of any new tool if number of examples for other tools goes below threshold, this function gets called to create examples. | |
Parameters | |
---------- | |
None | |
Returns | |
------- | |
None | |
Modifies | |
-------- | |
None | |
''' | |
for i,tool in enumerate(self.index): | |
if ((self.index[tool]['num_examples']<self.th) and (len(self.tools)>4)): | |
self.add_example(self.tools[tool], self.th-self.index[tool]['num_examples']) | |
def delete_tool(self, tool_name): | |
''' | |
On deletion of any new tool, this function gets called to delete the new tool in self.index and remove the respective examples. | |
Parameters | |
---------- | |
- tool_name : str | |
Returns | |
------- | |
None | |
Modifies | |
-------- | |
self.index | |
self.examples | |
''' | |
if tool_name not in self.tools.keys(): | |
print(color.RED+"[ERROR] You tried deleting a tool that does not exist."+color.END) | |
else: | |
indices = self.index[tool_name]['indices'] | |
del self.index[tool_name] | |
for idx, tool in enumerate(list(self.tools.values())): | |
if tool['tool_name'] == tool_name: | |
del self.tools[tool_name] | |
break | |
if len(indices) != 0: | |
for idx, index in enumerate(indices): | |
del self.examples[index-idx] | |
self.index = {} | |
self.build_index() | |
print(color.GREEN+f"[SUCCESS] Deleted the tool '{tool_name}'"+color.END) | |
self.replenish_examples() | |
self.similarity_retriever() | |
def similarity_retriever(self): | |
''' | |
On addition of any new example, this function gets called to create the new hnsw index for queries. | |
Parameters | |
---------- | |
None | |
Returns | |
------- | |
None | |
Modifies | |
-------- | |
self.query_embeddings | |
self.queries | |
self.search_index | |
''' | |
self.query_embeddings, self.queries = create_example_query_embeddings(self.examples) | |
self.search_index = create_hnsw_index(np.array(self.query_embeddings)) | |
# ------------------------------------------important functions------------------------------------------------------------ | |
def read_file(path): | |
""" | |
parameters | |
---------- | |
- path: str | |
path of the file to read in json. | |
returns | |
--------- | |
- file: object | |
the json file object. | |
""" | |
with open(path, "r") as f: | |
file = json.load(f) | |
return file | |
def create_query_embedding(query): | |
""" | |
Encodes the query to get its embedding. | |
parameters | |
--------- | |
- query: str | |
returns | |
--------- | |
- embedding: numpy array | |
embedding of the query. | |
""" | |
embedding = biencoder.encode([query], normalize_embeddings=True)[0] | |
return embedding | |
def create_example_query_embeddings(examples): | |
""" | |
creates query embeddings and saves it. | |
parameters | |
---------- | |
- examples: list | |
List of examples in json format. | |
returns | |
--------- | |
- query_embeddings: list | |
a list of query embeddings. | |
- answers: list | |
a list of answers corresponding to each query. | |
- queries: list | |
a list of queries. | |
""" | |
query_embeddings = [] | |
answers = [] | |
queries = [] | |
for example in examples: | |
query = example["query"] | |
answer = example["answer"] | |
queries.append(query) | |
query_embedding = create_query_embedding(query) | |
query_embeddings.append(query_embedding) | |
answers.append(answer) | |
np.save("query_embeddings.npy", query_embeddings) | |
return query_embeddings, queries | |
def create_hnsw_index(embedding, M=16, efC=100): | |
""" | |
creates the HNSW index. | |
parameters | |
---------- | |
- embedding: list | |
query embedding | |
- M: int | |
default = 16 | |
- efc: int | |
default = 100 | |
returns | |
---------- | |
- index: object | |
""" | |
embeddings = embedding | |
num_dim = embeddings.shape[1] | |
ids = np.arange(embeddings.shape[0]) | |
index = hnswlib.Index(space="ip", dim=num_dim) | |
index.init_index(max_elements=embeddings.shape[0], ef_construction=efC, M=M) | |
index.add_items(embeddings, ids) | |
return index | |
def find_nearest_neighbors(query_embedding, queries, search_index): | |
""" | |
Finds the k nearest neighbors using cosine similarity. | |
parameters | |
---------- | |
- query_embedding: list | |
the query embedding whose similarity check has to be made. | |
- queries: list | |
a list of queries in the examples. | |
- search_index: object | |
returns | |
---------- | |
- query_list: list | |
a list of similar queries. | |
""" | |
search_index.set_ef(EF) | |
labels, distances = search_index.knn_query( | |
query_embedding, k=K | |
) # Find the k-nearest neighbors for the query embedding | |
labels = [ | |
label | |
for label, distance in zip(labels[0], distances[0]) | |
if (1 - distance) >= COSINE_THRESHOLD | |
] | |
query_list = [queries[i] for i in labels] | |
return query_list | |
def rerank_queries_with_cross_encoder(query, chunks): | |
""" | |
Sorts the chunks based on their scores in descending order. | |
parameters | |
--------- | |
- query: str | |
- chunks: list | |
the list of similar chunks | |
returns | |
--------- | |
- sorted_chunks: list | |
the response after reranking. | |
""" | |
pairs = [(query, chunk) for chunk in chunks] | |
scores = cross_encoder.predict(pairs) | |
sorted_chunks = [chunk for _, chunk in sorted(zip(scores, chunks), reverse=True)] | |
return sorted_chunks | |
def rerank_queries_with_cross_encoder(query, chunks): | |
""" | |
Sorts the chunks based on their scores in descending order. | |
parameters | |
--------- | |
- query: str | |
- chunks: list | |
the list of similar chunks | |
returns | |
--------- | |
- sorted_chunks: list | |
the response after reranking. | |
""" | |
pairs = [(query, chunk) for chunk in chunks] | |
scores = cross_encoder.predict(pairs) | |
sorted_chunks = [chunk for _, chunk in sorted(zip(scores, chunks), reverse=True)] | |
return sorted_chunks | |
# -------------------------------------------fetch topk examples-------------------------------------------------------------- | |
def get_index(queries, topk_queries): | |
""" | |
Fetches the topk_indices given topk queries. | |
parameters: | |
- queries: list | |
A list of all queries. | |
- topk_queries: list | |
A list of topk queries. | |
returns: | |
- index: list of int | |
""" | |
index = [] | |
for i in topk_queries: | |
index.append(queries.index(i)) | |
return index | |
def get_topk_examples(topk_index, examples): | |
""" | |
Fetches topk examples from the examples given indices of topk examples. | |
parameters | |
---------- | |
- topk_index: int | |
list of indexes of topk tools. | |
- examples: list | |
list of all examples. | |
returns | |
---------- | |
- res: list | |
list of topk examples. | |
""" | |
res = [] | |
for index in topk_index: | |
res.append(examples[index]) | |
return res | |
def get_topk_given_query(query, queries, search_index, examples): | |
''' | |
fetches topk examples given the queries. | |
parameters | |
---------- | |
- query: str | |
user query | |
- queries: list | |
a list of all queries. | |
- search_index: | |
index object | |
- examples: list | |
a list of all examples. | |
returns | |
----------- | |
- topk_examples: list | |
a list of topk examples. | |
''' | |
query_embedding = create_query_embedding(query) | |
topk_queries = find_nearest_neighbors(query_embedding, queries, search_index) | |
ranked_topk_queries = rerank_queries_with_cross_encoder(query, topk_queries) | |
topk_indices = get_index(queries,ranked_topk_queries) | |
topk_examples = get_topk_examples(topk_indices, examples) | |
return topk_examples | |
# ---------------------------------------------------prompt generation functions------------------------------------------------- | |
def client_conn(message, model_name): | |
''' | |
Generates an API call instant. | |
parameters | |
---------- | |
- message: list | |
A list of conversation between ChatGPT and user. | |
returns | |
---------- | |
- completion: object | |
An API instant. | |
''' | |
# please don't change the timeout as example generation is time consuming | |
client = OpenAI(api_key=API_KEY, timeout=120, max_retries=2) | |
completion = client.chat.completions.create( | |
model=model_name, | |
messages=message, | |
temperature=0.2 | |
) | |
return completion | |
def create_prompt_for_query(user_query, examples, tools): | |
message = [ | |
{ | |
"role": "system", | |
"content": "The following is a friendly conversation between a human and an AI. The AI is professional and parses user input to several tasks. If the AI does not know the answer to a question, it truthfully says it does not know. The AI will be provided with a set of tools their descriptions and the argument in them. Here is the list of tools:"+ json.dumps(tools) + " \n Provide the answer in the exact format as given in the following examples. \nExamples" | |
} | |
] | |
for example in examples: | |
query = example['query'] | |
answer = example['answer'] | |
user_prompt = "Query: "+ str(query) | |
assistant_prompt = str(answer) | |
message.append( | |
{ | |
"role" : "user", | |
"content": user_prompt | |
}) | |
message.append( | |
{ | |
"role" : "assistant", | |
"content": assistant_prompt | |
}) | |
message.append( | |
{ | |
"role":"user", | |
"content":"Use the above tools to learn how to use the tool on any query. Analyse how to parse the query and extract the correct information and place in the argument name and value. Use all the required tools and arguments in correct order of its calling based on the query and your learning from all the examples. Do not assume any value, you can take the value from query or the previous called tool as shown in the examples. Also focus on the allowed values argument present in tool definition." | |
}) | |
message.append({ | |
"role" : "user", | |
"content" : "Now its your task to respond to the user queries in the same format as that in the above examples which is json." | |
}) | |
message.append({ | |
"role" : "user", | |
"content" : "Query: "+ user_query | |
}) | |
message.append( | |
{ | |
"role":"system", | |
"content": "Generate the answer in a json format only. Enclose the strings in double quotes" | |
}) | |
return message | |
def create_prompt_zero_shot(user_query, tools): | |
''' | |
Creates the prompt given user query, single examples and multi examples. | |
parameters | |
---------- | |
- user_query: str | |
the user query. | |
- single_example: list | |
the list of single tool examples. | |
- multi_example: list | |
the list of multi tiik examples. | |
returns | |
---------- | |
- message: list | |
the list of user prompts. | |
''' | |
message = [ | |
{ | |
"role": "system", | |
"content": "You are a intelligent AI agent specialized in giving the tool responses given a dictionary of tools. Here is the dictionary of tools: "+ json.dumps(tools) | |
} | |
] | |
message.append({ | |
"role" : "user", | |
"content" : ''' | |
Now its your task to respond to the user queries in the format given below | |
FORMAT:[{"tool_name": "...", "arguments": [{"argument_name": "...", "argument_value": ... (depending on the argument_type)}, ...]}, ...] | |
To reference the value of the ith tool in the chain, use $$PREV[i] as argument value. i = 0, 1, .. j-1; j = current tool’s index in the array If the query could not be answered with the given set of tools, output an empty list instead. | |
Output in the JSON format | |
''' | |
}) | |
message.append({ | |
"role" : "user", | |
"content" : "Query: "+ user_query | |
}) | |
return message | |
def create_prompt_for_modified_example(old_tools, modified_tool, relevant_examples): | |
message = [ | |
{ | |
"role":"system", | |
"content":"You are an intelligent AI Agent specialized in modifying the old data and generating the new relevant data." | |
} | |
] | |
message.append({ | |
"role":"user", | |
"content":"Given a list of old tools : " + json.dumps(old_tools) + "Let us say that I modified the tool" + "'" + modified_tool['tool_name'] + "'" + "to be" + json.dumps(modified_tool)+""" | |
Now your task is to modify the following examples where this tool was used according to its new definition keeping in mind the new schema of json mentioned above. | |
""" + json.dumps(relevant_examples) | |
}) | |
message.append({ | |
"role":"system", | |
"content":"Your response should be in json format only with the strings enclosed in double quotes ready to go in json.loads" | |
}) | |
return message | |
# --------------------------------------------------------------------------------------------------------------------------- | |
def create_prompt_for_new_example(old_tools, new_tool, examples, n=no_of_examples): | |
message = [ | |
{ | |
"role":"system", | |
"content":"You are an intelligent AI Agent specialized in generating the new relevant data." | |
} | |
] | |
message.append({ | |
"role":"user", | |
"content":"Given a list of old tools : " + json.dumps(old_tools) + "and a new tool" + "'" + new_tool['tool_name'] + "'" + " to be " + json.dumps(new_tool)+""" | |
Now your task is to create """+str(n)+""" examples of the usage of the new tool along with any of the tools from the old tool list, similar to the following example:. | |
""" + json.dumps(examples) | |
}) | |
message.append({ | |
"role":"system", | |
"content":"Your response should be in json format with the strings enclosed in double quotes. Note that To reference the value of the ith tool in the chain, use $$PREV[i] as argument value. i = 0, 1, .. j-1; j = current tool’s index in the array If the query could not be answered with the given set of tools, output an empty list instead." | |
}) | |
return message | |
# -------------------------------------------------retriever--------------------------------------------------------------------- | |
def create_tool_str(tools): | |
tool_str = '' | |
for tool in tools: | |
tool_str += f"Tool: {tool['tool_name']}, Desc: {tool['tool_description']}\n" | |
return tool_str | |
def tool_retriever_prompt(tool_str,user_query): | |
message=[ | |
{"role": "system", "content": "You are an intelligent assistant. Please help the user below."}, | |
{"role": "user", "content": f'''You are given the following set of tools:\n {tool_str} \n Can you please figure out which tools the query "{user_query}" will require to solve, out of these tools? Please return only the tool names inside []. If it does not need any tool, return an empty list'''}] | |
return message | |
def tool_retriever(tools, user_query): | |
tool_str = create_tool_str(tools) | |
message = tool_retriever_prompt(tool_str, user_query) | |
output = client_conn(message, model_name_tr) | |
output = output.choices[0].message.content | |
list_delim=output[output.find('[')+1:output.find(']')] | |
tools_retrieved = set() | |
for i in list_delim.split(','): | |
if (i.strip()!=''): | |
tools_retrieved.add(i.strip().replace('\'','').replace('\"','')) | |
return tools_retrieved | |
def final_tools(tools, tool_names): | |
tool = [] | |
for tool_name in tool_names: | |
tool.append(tools[tool_name]) | |
for tool_check in list(tools.values()): | |
if not tool_check["argument_list"]: | |
tool.append(tool_check) | |
return tool | |
# -------------------------------------------------------bonus section prompts--------------------------------------------------------------- | |
def create_prompt_for_query_bonus(user_query, examples, tools): | |
message = [ | |
{ | |
"role": "system", | |
"content": f"""The following is a friendly conversation between a human and an AI. | |
The AI is professional and parses user input to several tasks. If the AI does not | |
know the answer to a question, it truthfully says it does not know. The AI will be | |
provided with a set of tools their descriptions and the argument in them. Here is | |
the list of tools:"+ {json.dumps(tools)} + "\n Provide the answer in the | |
exact format as given in the following examples. \nExamples """ | |
} | |
] | |
for example in examples: | |
query = example['query'] | |
answer = example['answer'] | |
user_prompt = "Query: "+ str(query) | |
assistant_prompt = str(answer) | |
message.append( | |
{ | |
"role" : "user", | |
"content": user_prompt | |
}) | |
message.append( | |
{ | |
"role" : "assistant", | |
"content": assistant_prompt | |
}) | |
message.append( | |
{ | |
"role":"user", | |
"content":"Use the above tools to learn how to use the tool on any query. Analyse how to parse the query and extract the correct information and place in the argument name and value. Use all the required tools and arguments in correct order of its calling based on the query and your learning from all the examples. Do not assume any value, you can take the value from query or the previous called tool as shown in the examples. Also focus on the allowed values argument present in tool definition." | |
}) | |
message.append( | |
{ | |
"role":"user", | |
"content":f"After producing the list of tools, analyze the query and figure out whether it requires the combination of tool outputs via mathematical operations, iterations, conditional logic etc. or not. In case it does, use the lambda function to produce the required results. Examples of such queries are given below: \n " | |
}) | |
message.append( | |
{ | |
"role":"user", | |
"content":f"Find all tasks created by user 'USER-321' and check if there are more than 10 such tasks" | |
} | |
) | |
message.append( | |
{ | |
"role":"assistant", | |
"content":""" | |
"answer": [ | |
{ | |
"tool_name": "search_object_by_name", | |
"arguments": [ | |
{ | |
"argument_name": "query", | |
"argument_value": "USER-321" | |
} | |
] | |
}, | |
{ | |
"tool_name": "works_list", | |
"arguments": [ | |
{ | |
"argument_name": "created_by", | |
"argument_value": [ | |
"$$PREV[0]" | |
] | |
}, | |
{ | |
"argument_name": "type", | |
"argument_value": [ | |
"task" | |
] | |
} | |
]}, | |
{ | |
"tool_name": "lambda", | |
"arguments": [ | |
{ | |
"argument_name": "expression", | |
"argument_value": "lambda $$PREV[1]: True if len($$PREV[1]) > 10 else False" | |
} | |
] | |
} | |
] | |
""" | |
} | |
) | |
message.append({ | |
"role" : "user", | |
"content" : "Now its your task to respond to the user queries in the same format as that in the above examples which is json. Use the lambda function only when necessary." | |
}) | |
message.append({ | |
"role" : "user", | |
"content" : "Query: "+ user_query | |
}) | |
message.append( | |
{ | |
"role":"system", | |
"content": "Generate the answer in a json format only. Enclose the strings in double quotes" | |
}) | |
return message | |
# -------------------------------------------------miscellaneous functions------------------------------------------------------------ | |
def create_tool_dict(tools): | |
tool_dict = {} | |
for i in tools: | |
tool_dict[i["tool_name"]] = i | |
return tool_dict | |
def get_parsed_json(text_parsed): | |
ans_list = [] | |
json_str = text_parsed.split('```json') | |
for i in json_str: | |
if "```" in i: | |
json_data = i.split("```")[0].strip() | |
if json_data: # Check if the JSON data is not empty | |
json_obj = json.loads(json_data) | |
if type(json_obj)==list: | |
ans_list.extend(json_obj) | |
else: | |
ans_list.append(json_obj) | |
return ans_list | |
# -------------------------------------------------postprocessing--------------------------------------------------------------------- | |
def get_json(pred): | |
''' | |
parameters | |
----------------- | |
pred: str | |
Answer predicted by the LLM as a string | |
returns | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the input string as a json | |
''' | |
try: | |
# Tries to find ```json ``` type json format | |
return json.loads(pred[pred.find('```json'):-1*("".join(reversed(pred)).find('```')+1)]) | |
except: | |
# Tries to find first instance of '[' from the left and first instance of ']' from the right, and converts all in between ito a json. | |
try: | |
return json.loads(pred[pred.find('['):-1*("".join(reversed(pred)).find(']')+1)] + ']') | |
except: | |
# Tries to fix keys/ strings being wrapped in single quotes, then tries to decode as above | |
pred= pred.replace('\'','\"') | |
try: | |
return json.loads(pred[pred.find('['):-1*("".join(reversed(pred)).find(']')+1)] + ']') | |
# Tries to check for instances of boolean values true and false, that might have been misspelt as True and False | |
except: | |
pred = pred.replace("True", "true").replace("False", "false") | |
return json.loads(pred[pred.find('['):-1*("".join(reversed(pred)).find(']')+1)] + ']') | |
def dict_unwrap(json_pred): | |
# Unwraps a dictionary if GPT-4 outputs one instead of a list | |
if type(json_pred)==dict: | |
for key in json_pred.keys(): | |
if type(json_pred[key])==list: | |
print(json_pred[key]) | |
return json_pred[key] | |
return [] | |
return json_pred | |
def list_in_str_handler(json_pred): | |
''' | |
parameters | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the tool call sequence | |
----------------- | |
json_pred: list | |
Tool call sequence with string arguments like '[arg_val]' turned into 'arg_val' | |
''' | |
# Iterate over tool call sequence and get argument values | |
for i, tool in enumerate(json_pred): | |
for j, arg in enumerate(tool["arguments"]): | |
arg_val= arg["argument_value"] | |
# If value is a string and it starts with [ and ends with ], remove them | |
if (type(arg_val)==str): | |
if arg_val.startswith('[') and arg_val.endswith(']'): | |
arg["argument_value"]=arg_val[1:-1] | |
# If value is a list, then iterate over it and perform similar operations as above | |
elif (type(arg_val)==list): | |
for num_item,arg_val_item in enumerate(arg_val): | |
if (type(arg_val_item)==str): | |
if arg_val_item.startswith('[') and arg_val_item.endswith(']'): | |
arg["argument_value"][num_item]=arg_val_item[1:-1] | |
return json_pred | |
def func_name_handler(json_pred,tools,no_arg_tool_list): | |
''' | |
parameters | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the tool call sequence | |
tools: dict | |
Dict representing tools | |
no_arg_tool_list: list | |
List of tool names that do not have arguments | |
returns | |
----------------- | |
json_pred: list | |
Tool call sequence with arguments with $${function_name} errors removed | |
''' | |
# Iteration variable | |
i=0 | |
# WHILE loop is required, len(range()) does not work as the loop condition is kept static while items are inserted into the loop. | |
while(i<len(json_pred)): | |
tool=json_pred[i] | |
for j, arg in enumerate(tool["arguments"]): | |
if arg["argument_name"] in tools[tool["tool_name"]]["args"] : | |
# Check argument value | |
temp = arg["argument_value"] | |
if type(temp)==str: | |
# If argument value starts with $$ but is not $$PREV[i] | |
if (temp.startswith('$$')) and not temp.startswith('$$PREV['): | |
# Remove the $$ | |
temp_lowercase_call=temp.lower()[2:] | |
# Iterate over tools with no arguments to figure the appropriate tool to call | |
for no_arg_tool in no_arg_tool_list: | |
if temp_lowercase_call.startswith(no_arg_tool): | |
# Create tool call | |
tool_ins = {} | |
tool_ins['arguments']=[] | |
tool_ins['tool_name']=no_arg_tool | |
# Insert tool into the list | |
json_pred.insert(i,tool_ins) | |
i+=1 | |
# Iterate over the tools, starting from the tool under consideration | |
for i_n, tool_n in enumerate(json_pred[i:]): | |
for j_n, arg_n in enumerate(tool_n["arguments"]): | |
# Check the argument value | |
prevset = arg_n["argument_value"] | |
# If the argument value is | |
if (type(prevset)==str): | |
# If the argument is of $$PREV[i] type, and it referenced the returned value of the tool that | |
# came at the same position as, or after the inserted tool, increment i for it, | |
if prevset.startswith("$$PREV["): | |
try: | |
n=int(prevset[7:-1]) | |
except: | |
indexing_pos=prevset.find('][') | |
try: | |
n=int(prevset[7:indexing_pos]) | |
except: | |
pass | |
if n>=i-1: | |
arg_n["argument_value"]=f"$$PREV[{n}]" | |
# If the argument value is a list, iterate over the values and perform the same process as above. | |
elif type(prevset)==list: | |
for list_num,prev_val in enumerate(prevset): | |
if prev_val.startswith("$$PREV["): | |
n=0 | |
try: | |
n=int(prevset[7:-1]) | |
except: | |
indexing_pos=prevset.find('][') | |
try: | |
n=int(prevset[7:indexing_pos]) | |
except: | |
pass | |
if n>=i-1: | |
arg_n["argument_value"][list_num]=f"$$PREV[{n}]" | |
arg["argument_value"]=f"$$PREV[{i-1}]" | |
# otherwise the argument value is a list, iterate over this list, and perform the same operations as above. | |
elif (type(temp)==list): | |
for num_arg,temp_el in enumerate(temp): | |
if type(temp_el)==str: | |
if (temp_el.startswith('$$')) and not temp_el.startswith('$$PREV'): | |
temp_lowercase_call=temp_el.lower()[2:] | |
for no_arg_tool in no_arg_tool_list: | |
if temp_lowercase_call.startswith(no_arg_tool): | |
tool_ins = {} | |
tool_ins['arguments']=[] | |
tool_ins['tool_name']=no_arg_tool | |
json_pred.insert(i,tool_ins) | |
i+=1 | |
for i_n, tool_n in enumerate(json_pred[i:]): | |
for j_n, arg_n in enumerate(tool_n["arguments"]): | |
prevset = arg_n["argument_value"] | |
if (type(prevset) not in [list,bool,float]): | |
if prevset.startswith("$$PREV[") and int(prevset[7:-1])>=i-1: | |
n=int(prevset[7:-1])+1 | |
arg_n["argument_value"]=f"$$PREV[{n}]" | |
elif type(prevset) not in [bool,float]: | |
for list_num,prev_val in enumerate(prevset): | |
try: | |
if prev_val.startswith("$$PREV[") and int(prev_val[7:-1])>=i-1: | |
n=int(prevset[7:-1])+1 | |
arg_n["argument_value"][list_num]=f"$$PREV[{n}]" | |
except: | |
pass | |
arg["argument_value"][num_arg]=f"$$PREV[{i-1}]" | |
i+=1 | |
return json_pred | |
def unknown_tool_remover(json_pred,tools): | |
for i,tool_call in enumerate(json_pred): | |
if tool_call["tool_name"] not in tools.keys(): | |
return True | |
def type_handler(json_pred,tools,array_check,string_check,num_check,bool_check,string_to_boolean): | |
''' | |
parameters | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the tool call sequence | |
tools: dict | |
Dict representing tools | |
array_check: list | |
List of keywords to check for array return types | |
string_check: list | |
List of keywords to check for string return types | |
num_check: list | |
List of keywords to check for numeral return types | |
bool_check: list | |
List of keywords to check for boolean return types | |
string_to_boolean: dict | |
Dictionary mapping common representations of True/False values to booleans | |
returns | |
----------------- | |
json_pred: list | |
Tool call sequence with tool inputs respecting tool input type requirements | |
''' | |
for i, tool in enumerate(json_pred): | |
for j, arg in enumerate(tool["arguments"]): | |
if arg["argument_name"] in tools[tool["tool_name"]]["args"] : | |
arg_type = tools[tool["tool_name"]]["args"][arg["argument_name"]]["argument_type"] | |
temp = json_pred[i]["arguments"][j]["argument_value"] | |
# Split the argument type by spaces for easier checks later | |
typcheck = set(arg_type.lower().split(' ')) | |
# To check if arg_type is supposed to be a list, but it is not | |
if typcheck.intersection(array_check) and type(temp)!=list: | |
# If argument is not $$PREV type and is a string, convert to array of strings | |
if not temp.startswith("$$") and typcheck.intersection(string_check) : | |
temp = [str(temp)] | |
# If argument type has integer, convert to list of integers | |
elif typcheck.intersection(num_check): | |
if type(temp) in [int,float]: | |
temp = [temp] | |
else: | |
try: | |
temp = [int(temp)] | |
except: | |
pass | |
# If argument type has boolean, convert to list of booleans | |
elif typcheck.intersection(bool_check): | |
try: | |
if type(temp)==str: | |
temp = [string_to_boolean.get(temp,temp)] | |
else: | |
temp=[bool(temp)] | |
except: | |
pass | |
# If argument type is string, convert to string | |
elif typcheck.intersection(string_check) and type(temp) != str: | |
# If the argument type is currently a list, convert to string. Only the first argument is going to be considered. | |
if (type(temp)==list and not temp): | |
try: | |
if (not temp[0].startswith("$$")): | |
temp = str(temp[0]) | |
except: | |
pass | |
if (type(temp) in [int,float]): | |
temp=str(temp) | |
# If argument type is boolean, convert to boolean | |
elif typcheck.intersection(bool_check) and type(temp)!= bool : | |
# If the argument type is currently a list, convert to boolean. Only the first argument is going to be considered. | |
if (type(temp)==list and temp): | |
if (type(temp[0])==str): | |
if not temp[0].startswith("$$"): | |
try: | |
temp = string_to_boolean.get(temp[0], temp) | |
except: | |
pass | |
elif (type(temp[0])==bool): | |
temp=temp[0] | |
elif (type(temp[0]) in [int,float]): | |
temp=bool(temp[0]) | |
# If the argument type is currently a string, convert to boolean | |
elif type(temp)==str: | |
temp=string_to_boolean.get(temp,temp) | |
json_pred[i]["arguments"][j]["argument_value"] = temp | |
else: | |
print("arg name not match for",arg) | |
return json_pred | |
def prev_ret_type_handler(json_pred,tools,array_check,string_check,num_check,bool_check): | |
''' | |
parameters | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the tool call sequence | |
tools: dict | |
Dict representing tools | |
array_check: list | |
List of keywords to check for array return types | |
string_check: list | |
List of keywords to check for string return types | |
num_check: list | |
List of keywords to check for numeral return types | |
bool_check: list | |
List of keywords to check for boolean return types | |
returns | |
----------------- | |
json_pred: list | |
Tool call sequence with $$PREV[i] type arguments respecting tool input type requirements | |
''' | |
# Iterate over list of dictionaries | |
for i, tool in enumerate(json_pred): | |
for j, arg in enumerate(tool["arguments"]): | |
if arg["argument_name"] in tools[tool["tool_name"]]["args"] : | |
# Fetch argument type, and the current argument value | |
arg_type = set(tools[tool["tool_name"]]["args"][arg["argument_name"]]["argument_type"].split(' ')) | |
temp = json_pred[i]["arguments"][j]["argument_value"] | |
# If current argument is a string | |
if type(temp)== str: | |
# If it is a $$PREV type argument | |
if temp.startswith('$$PREV'): | |
# Identify the return type of the tool call referenced by $$PREV[i], | |
refer=temp[7:-1] | |
ref_type= set(tools[json_pred[int(refer)]["tool_name"]]["return_type"].split(' ')) | |
# If the referenced tool does not return an array, but the argument expects one | |
if not ref_type.intersection(array_check) and arg_type.intersection(array_check): | |
json_pred[i]["arguments"][j]["argument_value"]=[temp] | |
# Otherwise, if the argument value is a list, iterate over it | |
elif (type(temp)==list): | |
for temp_el in temp: | |
# Perform similar actions as above | |
if type(temp_el)==str: | |
if temp_el.startswith('$$PREV'): | |
refer=temp_el.lower()[7:-1] | |
ref_type= set(tools[json_pred[int(refer)]["tool_name"]]["return_type"].split(' ')) | |
if (ref_type.intersection(array_check) and arg_type.intersection(array_check)) or (not ref_type.intersection(array_check) and not arg_type.intersection(array_check)): | |
json_pred[i]["arguments"][j]["argument_value"]=temp_el | |
return json_pred | |
def postprocess(json_pred, tool_data): | |
''' | |
parameters | |
----------------- | |
json_pred: str | |
Answer predicted by the LLM as a string | |
tool_data: dict | |
dictionary representing the given tools | |
returns | |
----------------- | |
json_pred: list | |
List of dictionaries that represents the final answer | |
''' | |
# turn json in string format into list of dicts | |
# Get dictionary of tools to simplify work | |
json_pred = dict_unwrap(json_pred) | |
tools = {} | |
for i,tool in enumerate(tool_data): | |
tools[tool["tool_name"]] = tool | |
tools[tool["tool_name"]]["args"] = {} | |
tools[tool["tool_name"]]["return_type"] = tool["return_type"] | |
for arg in tool["argument_list"]: | |
tools[tool["tool_name"]]["args"][arg["argument_name"]] = arg | |
for tool in json_pred: | |
if not tool.get('arguments',None): | |
tool["arguments"] = [] | |
# Lists holding keywords to search for in the argument types/ return types | |
array_check = ["array","list","arrays","lists"] | |
string_check=["string","str","strings"] | |
num_check=["integer","int32","number","float","double","float32"] | |
bool_check=["bool","boolean","true","false"] | |
# Dictionary mapping common strings to boolean values | |
string_to_boolean = {"True": True, "False": False, "1": True, "0": False, "yes": True, "no": False, "true" : True, "false": False, 'True':True, 'False':False} | |
# Get tools for which no argument is required, to fix $${function_name} errors encountered | |
no_arg_tool_list = [] | |
for i,tool in enumerate(tool_data): | |
if not tool["argument_list"]: | |
no_arg_tool_list.append(tool["tool_name"]) | |
try: | |
if unknown_tool_remover(json_pred, tools): | |
return [] | |
except: | |
pass | |
try: | |
json_pred = list_in_str_handler(json_pred) | |
except: | |
pass | |
try: | |
# Fix type errors | |
json_pred = type_handler(json_pred,tools,array_check,string_check,num_check,bool_check,string_to_boolean) | |
except: | |
pass | |
try: | |
# Fix $${function_name} errors | |
json_pred = func_name_handler(json_pred,tools,no_arg_tool_list) | |
except: | |
pass | |
try: | |
# Fix return types for $$PREV[i] type arguments | |
json_pred = prev_ret_type_handler(json_pred,tools,array_check,string_check,num_check,bool_check) | |
except: | |
pass | |
return json_pred | |
tools = read_file(tool_list_path) | |
examples = read_file(example_path) | |
tool_dict = create_tool_dict(tools["tools"]) | |
tool_obj = Tools(tool_dict, examples) | |
def main(query): | |
if len(tool_obj.examples)<5: | |
message = create_prompt_zero_shot(query,tool_obj.tools) | |
res = client_conn(message, model_name_tr) | |
answer = [] | |
try: | |
answer = json.loads(res.choices[0].message.content) | |
answer = postprocess(answer, list(tool_obj.tools.values())) # postprocessing | |
except: | |
pass | |
return answer | |
topk_examples = get_topk_given_query(query, tool_obj.queries, tool_obj.search_index, tool_obj.examples) | |
reduced_tools = tool_retriever(list(tool_obj.tools.values()), query) | |
if not reduced_tools: | |
return [] | |
tool_list = final_tools(tool_obj.tools, list(tool_obj.tools.keys())) | |
if 'lambda' in reduced_tools: | |
message = create_prompt_for_query_bonus(query, topk_examples, tool_list) | |
else: | |
message = create_prompt_for_query(query, topk_examples, tool_list) | |
res = client_conn(message, model_name_tr) | |
print("GPT4 Response: ",res.choices[0].message.content) | |
answer = [] | |
try: | |
answer = json.loads(res.choices[0].message.content) | |
answer = postprocess(answer, list(tool_obj.tools.values())) # postprocessing | |
except: | |
pass | |
return answer |