Spaces:
Running
Running
File size: 8,381 Bytes
4d1746c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
import json
import os
from anthropic import Anthropic
from anthropic.types import TextBlock, ToolUseBlock
from base_handler import BaseHandler
from constant import GORILLA_TO_OPENAPI
from model_style import ModelStyle
from utils import (
ast_parse,
combine_consecutive_user_prompts,
convert_system_prompt_into_user_prompt,
convert_to_function_call,
convert_to_tool,
extract_system_prompt,
format_execution_results_prompting,
func_doc_language_specific_pre_processing,
system_prompt_pre_processing_chat_model,
)
class ClaudeHandler(BaseHandler):
def __init__(self, model_name, temperature) -> None:
super().__init__(model_name, temperature)
self.model_style = ModelStyle.Anthropic
self.client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
def decode_ast(self, result, language="Python"):
if "FC" not in self.model_name:
func = result
if " " == func[0]:
func = func[1:]
if not func.startswith("["):
func = "[" + func
if not func.endswith("]"):
func = func + "]"
decode_output = ast_parse(func, language)
return decode_output
else:
decoded_output = []
for invoked_function in result:
name = list(invoked_function.keys())[0]
params = json.loads(invoked_function[name])
decoded_output.append({name: params})
return decoded_output
def decode_execute(self, result):
if "FC" not in self.model_name:
func = result
if " " == func[0]:
func = func[1:]
if not func.startswith("["):
func = "[" + func
if not func.endswith("]"):
func = func + "]"
decode_output = ast_parse(func)
execution_list = []
for function_call in decode_output:
for key, value in function_call.items():
execution_list.append(
f"{key}({','.join([f'{k}={repr(v)}' for k, v in value.items()])})"
)
return execution_list
else:
function_call = convert_to_function_call(result)
return function_call
#### FC methods ####
def _query_FC(self, inference_data: dict):
inference_data["inference_input_log"] = {
"message": repr(inference_data["message"]),
"tools": inference_data["tools"],
}
messages = inference_data["message"]
if inference_data["caching_enabled"]:
# Only add cache control to the last two user messages
# Remove previously set cache control flags from all user messages except the last two
count = 0
for message in reversed(messages):
if message["role"] == "user":
if count < 2:
message["content"][0]["cache_control"] = {"type": "ephemeral"}
else:
if "cache_control" in message["content"][0]:
del message["content"][0]["cache_control"]
count += 1
return self.client.beta.prompt_caching.messages.create(
model=self.model_name.strip("-FC"),
max_tokens=(
8192 if "claude-3-5" in self.model_name else 4096
), # 3.5 Sonnet has a higher max token limit
tools=inference_data["tools"],
messages=messages,
)
def _pre_query_processing_FC(self, inference_data: dict, test_entry: dict) -> dict:
for round_idx in range(len(test_entry["question"])):
test_entry["question"][round_idx] = convert_system_prompt_into_user_prompt(
test_entry["question"][round_idx]
)
test_entry["question"][round_idx] = combine_consecutive_user_prompts(
test_entry["question"][round_idx]
)
inference_data["message"] = []
test_entry_id: str = test_entry["id"]
test_category: str = test_entry_id.rsplit("_", 1)[0]
# caching enabled only for multi_turn category
inference_data["caching_enabled"] = (
"claude-3-sonnet" not in self.model_name
)
return inference_data
def _compile_tools(self, inference_data: dict, test_entry: dict) -> dict:
functions: list = test_entry["function"]
test_category: str = test_entry["id"].rsplit("_", 1)[0]
functions = func_doc_language_specific_pre_processing(functions, test_category)
tools = convert_to_tool(functions, GORILLA_TO_OPENAPI, self.model_style)
if inference_data["caching_enabled"]:
# First time compiling tools, so adding cache control flag to the last tool
if "tools" not in inference_data:
tools[-1]["cache_control"] = {"type": "ephemeral"}
# This is the situation where the tools are already compiled and we are adding more tools to the existing tools (in miss_func category)
# We add the cache control flag to the last tool in the previous existing tools and the last tool in the new tools to maximize cache hit
else:
existing_tool_len = len(inference_data["tools"])
tools[existing_tool_len - 1]["cache_control"] = {"type": "ephemeral"}
tools[-1]["cache_control"] = {"type": "ephemeral"}
inference_data["tools"] = tools
return inference_data
def _parse_query_response_FC(self, api_response: any) -> dict:
text_outputs = []
tool_call_outputs = []
tool_call_ids = []
for content in api_response.content:
if isinstance(content, TextBlock):
text_outputs.append(content.text)
elif isinstance(content, ToolUseBlock):
tool_call_outputs.append({content.name: json.dumps(content.input)})
tool_call_ids.append(content.id)
model_responses = tool_call_outputs if tool_call_outputs else text_outputs
model_responses_message_for_chat_history = api_response.content
return {
"model_responses": model_responses,
"model_responses_message_for_chat_history": model_responses_message_for_chat_history,
"tool_call_ids": tool_call_ids,
"input_token": api_response.usage.input_tokens,
"output_token": api_response.usage.output_tokens,
}
def add_first_turn_message_FC(
self, inference_data: dict, first_turn_message: list[dict]
) -> dict:
for message in first_turn_message:
message["content"] = [{"type": "text", "text": message["content"]}]
inference_data["message"].extend(first_turn_message)
return inference_data
def _add_next_turn_user_message_FC(
self, inference_data: dict, user_message: list[dict]
) -> dict:
for message in user_message:
message["content"] = [{"type": "text", "text": message["content"]}]
inference_data["message"].extend(user_message)
return inference_data
def _add_assistant_message_FC(
self, inference_data: dict, model_response_data: dict
) -> dict:
inference_data["message"].append(
{
"role": "assistant",
"content": model_response_data["model_responses_message_for_chat_history"],
}
)
return inference_data
def _add_execution_results_FC(
self,
inference_data: dict,
execution_results: list[str],
model_response_data: dict,
) -> dict:
# Claude don't use the tool role; it uses the user role to send the tool output
tool_message = {
"role": "user",
"content": [],
}
for execution_result, tool_call_id in zip(
execution_results, model_response_data["tool_call_ids"]
):
tool_message["content"].append(
{
"type": "tool_result",
"content": execution_result,
"tool_use_id": tool_call_id,
}
)
inference_data["message"].append(tool_message)
return inference_data
|