# What is this? ## Helper utilities for token counting import base64 import io import struct from typing import Callable, List, Literal, Optional, Tuple, Union import tiktoken import litellm from litellm import verbose_logger from litellm.constants import ( DEFAULT_IMAGE_HEIGHT, DEFAULT_IMAGE_TOKEN_COUNT, DEFAULT_IMAGE_WIDTH, MAX_LONG_SIDE_FOR_IMAGE_HIGH_RES, MAX_SHORT_SIDE_FOR_IMAGE_HIGH_RES, MAX_TILE_HEIGHT, MAX_TILE_WIDTH, ) from litellm.litellm_core_utils.default_encoding import encoding as default_encoding from litellm.llms.custom_httpx.http_handler import _get_httpx_client from litellm.types.llms.openai import ( AllMessageValues, ChatCompletionNamedToolChoiceParam, ChatCompletionToolParam, OpenAIMessageContent, ) from litellm.types.utils import SelectTokenizerResponse def get_modified_max_tokens( model: str, base_model: str, messages: Optional[List[AllMessageValues]], user_max_tokens: Optional[int], buffer_perc: Optional[float], buffer_num: Optional[float], ) -> Optional[int]: """ Params: Returns the user's max output tokens, adjusted for: - the size of input - for models where input + output can't exceed X - model max output tokens - for models where there is a separate output token limit """ try: if user_max_tokens is None: return None ## MODEL INFO _model_info = litellm.get_model_info(model=model) max_output_tokens = litellm.get_max_tokens( model=base_model ) # assume min context window is 4k tokens ## UNKNOWN MAX OUTPUT TOKENS - return user defined amount if max_output_tokens is None: return user_max_tokens input_tokens = litellm.token_counter(model=base_model, messages=messages) # token buffer if buffer_perc is None: buffer_perc = 0.1 if buffer_num is None: buffer_num = 10 token_buffer = max( buffer_perc * input_tokens, buffer_num ) # give at least a 10 token buffer. token counting can be imprecise. input_tokens += int(token_buffer) verbose_logger.debug( f"max_output_tokens: {max_output_tokens}, user_max_tokens: {user_max_tokens}" ) ## CASE 1: model input + output can't exceed X - happens when max input = max output, e.g. gpt-3.5-turbo if _model_info["max_input_tokens"] == max_output_tokens: verbose_logger.debug( f"input_tokens: {input_tokens}, max_output_tokens: {max_output_tokens}" ) if input_tokens > max_output_tokens: pass # allow call to fail normally - don't set max_tokens to negative. elif ( user_max_tokens + input_tokens > max_output_tokens ): # we can still modify to keep it positive but below the limit verbose_logger.debug( f"MODIFYING MAX TOKENS - user_max_tokens={user_max_tokens}, input_tokens={input_tokens}, max_output_tokens={max_output_tokens}" ) user_max_tokens = int(max_output_tokens - input_tokens) ## CASE 2: user_max_tokens> model max output tokens elif user_max_tokens > max_output_tokens: user_max_tokens = max_output_tokens verbose_logger.debug( f"litellm.litellm_core_utils.token_counter.py::get_modified_max_tokens() - user_max_tokens: {user_max_tokens}" ) return user_max_tokens except Exception as e: verbose_logger.error( "litellm.litellm_core_utils.token_counter.py::get_modified_max_tokens() - Error while checking max token limit: {}\nmodel={}, base_model={}".format( str(e), model, base_model ) ) return user_max_tokens def resize_image_high_res( width: int, height: int, ) -> Tuple[int, int]: # Maximum dimensions for high res mode max_short_side = MAX_SHORT_SIDE_FOR_IMAGE_HIGH_RES max_long_side = MAX_LONG_SIDE_FOR_IMAGE_HIGH_RES # Return early if no resizing is needed if ( width <= MAX_SHORT_SIDE_FOR_IMAGE_HIGH_RES and height <= MAX_SHORT_SIDE_FOR_IMAGE_HIGH_RES ): return width, height # Determine the longer and shorter sides longer_side = max(width, height) shorter_side = min(width, height) # Calculate the aspect ratio aspect_ratio = longer_side / shorter_side # Resize based on the short side being 768px if width <= height: # Portrait or square resized_width = max_short_side resized_height = int(resized_width * aspect_ratio) # if the long side exceeds the limit after resizing, adjust both sides accordingly if resized_height > max_long_side: resized_height = max_long_side resized_width = int(resized_height / aspect_ratio) else: # Landscape resized_height = max_short_side resized_width = int(resized_height * aspect_ratio) # if the long side exceeds the limit after resizing, adjust both sides accordingly if resized_width > max_long_side: resized_width = max_long_side resized_height = int(resized_width / aspect_ratio) return resized_width, resized_height # Test the function with the given example def calculate_tiles_needed( resized_width, resized_height, tile_width=MAX_TILE_WIDTH, tile_height=MAX_TILE_HEIGHT, ): tiles_across = (resized_width + tile_width - 1) // tile_width tiles_down = (resized_height + tile_height - 1) // tile_height total_tiles = tiles_across * tiles_down return total_tiles def get_image_type(image_data: bytes) -> Union[str, None]: """take an image (really only the first ~100 bytes max are needed) and return 'png' 'gif' 'jpeg' 'webp' 'heic' or None. method added to allow deprecation of imghdr in 3.13""" if image_data[0:8] == b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a": return "png" if image_data[0:4] == b"GIF8" and image_data[5:6] == b"a": return "gif" if image_data[0:3] == b"\xff\xd8\xff": return "jpeg" if image_data[4:8] == b"ftyp": return "heic" if image_data[0:4] == b"RIFF" and image_data[8:12] == b"WEBP": return "webp" return None def get_image_dimensions( data: str, ) -> Tuple[int, int]: """ Async Function to get the dimensions of an image from a URL or base64 encoded string. Args: data (str): The URL or base64 encoded string of the image. Returns: Tuple[int, int]: The width and height of the image. """ img_data = None try: # Try to open as URL client = _get_httpx_client() response = client.get(data) img_data = response.read() except Exception: # If not URL, assume it's base64 _header, encoded = data.split(",", 1) img_data = base64.b64decode(encoded) img_type = get_image_type(img_data) if img_type == "png": w, h = struct.unpack(">LL", img_data[16:24]) return w, h elif img_type == "gif": w, h = struct.unpack("H", fhandle.read(2))[0] - 2 fhandle.seek(1, 1) h, w = struct.unpack(">HH", fhandle.read(4)) return w, h elif img_type == "webp": # For WebP, the dimensions are stored at different offsets depending on the format # Check for VP8X (extended format) if img_data[12:16] == b"VP8X": w = struct.unpack("> 14) & 0x3FFF) + 1 return w, h # return sensible default image dimensions if unable to get dimensions return DEFAULT_IMAGE_WIDTH, DEFAULT_IMAGE_HEIGHT def calculate_img_tokens( data, mode: Literal["low", "high", "auto"] = "auto", base_tokens: int = 85, # openai default - https://openai.com/pricing use_default_image_token_count: bool = False, ): """ Calculate the number of tokens for an image. Args: data (str): The URL or base64 encoded string of the image. mode (Literal["low", "high", "auto"]): The mode to use for calculating the number of tokens. base_tokens (int): The base number of tokens for an image. use_default_image_token_count (bool): When True, will NOT make a GET request to the image URL and instead return the default image dimensions. Returns: int: The number of tokens for the image. """ if use_default_image_token_count: verbose_logger.debug( "Using default image token count: {}".format(DEFAULT_IMAGE_TOKEN_COUNT) ) return DEFAULT_IMAGE_TOKEN_COUNT if mode == "low" or mode == "auto": return base_tokens elif mode == "high": # Run the async function using the helper width, height = get_image_dimensions( data=data, ) resized_width, resized_height = resize_image_high_res( width=width, height=height ) tiles_needed_high_res = calculate_tiles_needed( resized_width=resized_width, resized_height=resized_height ) tile_tokens = (base_tokens * 2) * tiles_needed_high_res total_tokens = base_tokens + tile_tokens return total_tokens TokenCounterFunction = Callable[[str], int] """ Type for a function that counts tokens in a string. """ class _MessageCountParams: """ A class to hold the parameters for counting tokens in messages. """ def __init__( self, model: str, custom_tokenizer: Optional[Union[dict, SelectTokenizerResponse]], ): from litellm.utils import print_verbose actual_model = _fix_model_name(model) if actual_model == "gpt-3.5-turbo-0301": self.tokens_per_message = ( 4 # every message follows <|start|>{role/name}\n{content}<|end|>\n ) self.tokens_per_name = -1 # if there's a name, the role is omitted elif actual_model in litellm.open_ai_chat_completion_models: self.tokens_per_message = 3 self.tokens_per_name = 1 elif actual_model in litellm.azure_llms: self.tokens_per_message = 3 self.tokens_per_name = 1 else: print_verbose(f"Warning: unknown model {model}. Using default token params.") self.tokens_per_message = 3 self.tokens_per_name = 1 self.count_function = _get_count_function(model, custom_tokenizer) def token_counter( model="", custom_tokenizer: Optional[Union[dict, SelectTokenizerResponse]] = None, text: Optional[Union[str, List[str]]] = None, messages: Optional[List[AllMessageValues]] = None, count_response_tokens: Optional[bool] = False, tools: Optional[List[ChatCompletionToolParam]] = None, tool_choice: Optional[ChatCompletionNamedToolChoiceParam] = None, use_default_image_token_count: Optional[bool] = False, default_token_count: Optional[int] = None, ) -> int: """ Count the number of tokens in a given text using a specified model. Args: model (str): The name of the model to use for tokenization. Default is an empty string. custom_tokenizer (Optional[dict]): A custom tokenizer created with the `create_pretrained_tokenizer` or `create_tokenizer` method. Must be a dictionary with a string value for `type` and Tokenizer for `tokenizer`. Default is None. text (str): The raw text string to be passed to the model. Default is None. messages (Optional[List[AllMessageValues]]): Alternative to passing in text. A list of dictionaries representing messages with "role" and "content" keys. Default is None. count_response_tokens (Optional[bool]): set to True to indicate we are processing a stream response. tools (Optional[List[ChatCompletionToolParam]]): The available tools. Default is None. tool_choice (Optional[ChatCompletionNamedToolChoiceParam]): The tool choice. Default is None. use_default_image_token_count (Optional[bool]): When True, will NOT make a GET request to the image URL and instead return the default image dimensions. Default is False. default_token_count (Optional[int]): The default number of tokens to return for a message block, if an error occurs. Default is None. Returns: int: The number of tokens in the text. """ if text is not None and messages is not None: raise ValueError("text and messages cannot both be set") if use_default_image_token_count is None: use_default_image_token_count = False if text is not None: if tools or tool_choice: raise ValueError("tools or tool_choice cannot be set if using text") if isinstance(text, List): text_to_count = "".join(t for t in text if isinstance(t, str)) elif isinstance(text, str): text_to_count = text count_function = _get_count_function(model, custom_tokenizer) num_tokens = count_function(text_to_count) elif messages is not None: params = _MessageCountParams(model, custom_tokenizer) num_tokens = _count_messages( params, messages, use_default_image_token_count, default_token_count ) if count_response_tokens is False: includes_system_message = any( [message.get("role", None) == "system" for message in messages] ) num_tokens += _count_extra( params.count_function, tools, tool_choice, includes_system_message ) else: raise ValueError("Either text or messages must be provided") return num_tokens def _count_messages( params: _MessageCountParams, messages: List[AllMessageValues], use_default_image_token_count: bool, default_token_count: Optional[int], ) -> int: """ Count the number of tokens in a list of messages. Args: params (_MessageCountParams): The parameters for counting tokens. messages (List[AllMessageValues]): The list of messages to count tokens in. use_default_image_token_count (bool): When True, will NOT make a GET request to the image URL and instead return the default image dimensions. default_token_count (Optional[int]): The default number of tokens to return for a message block, if an error occurs. """ num_tokens = 0 for message in messages: num_tokens += params.tokens_per_message for key, value in message.items(): if value is None: pass elif key == "tool_calls": if isinstance(value, List): for tool_call in value: if "function" in tool_call: function_arguments = tool_call["function"].get( "arguments", [] ) num_tokens += params.count_function(str(function_arguments)) else: raise ValueError( f"Unsupported tool call {tool_call} must contain a function key" ) else: raise ValueError( f"Unsupported type {type(value)} for key tool_calls in message {message}" ) elif isinstance(value, str): num_tokens += params.count_function(value) if key == "name": num_tokens += params.tokens_per_name elif key == 'content' and isinstance(value, List): num_tokens += _count_content_list( params.count_function, value, use_default_image_token_count, default_token_count, ) else: raise ValueError( f"Unsupported type {type(value)} for key {key} in message {message}" ) return num_tokens def _count_extra( count_function: TokenCounterFunction, tools: Optional[List[ChatCompletionToolParam]], tool_choice: Optional[ChatCompletionNamedToolChoiceParam], includes_system_message: bool, ) -> int: """Count extra tokens for function definitions and tool choices. Args: count_function (TokenCounterFunction): The function to count tokens. tools (Optional[List[ChatCompletionToolParam]]): The available tools. tool_choice (Optional[ChatCompletionNamedToolChoiceParam]): The tool choice. includes_system_message (bool): Whether the messages include a system message. """ num_tokens = 3 # every reply is primed with <|start|>assistant<|message|> if tools: num_tokens += count_function(_format_function_definitions(tools)) num_tokens += 9 # Additional tokens for function definition of tools # If there's a system message and tools are present, subtract four tokens if tools and includes_system_message: num_tokens -= 4 # If tool_choice is 'none', add one token. # If it's an object, add 4 + the number of tokens in the function name. # If it's undefined or 'auto', don't add anything. if tool_choice == "none": num_tokens += 1 elif isinstance(tool_choice, dict): num_tokens += 7 num_tokens += count_function(str(tool_choice["function"]["name"])) return num_tokens def _get_count_function( model: Optional[str], custom_tokenizer: Optional[Union[dict, SelectTokenizerResponse]] = None, ) -> TokenCounterFunction: """ Get the function to count tokens based on the model and custom tokenizer.""" from litellm.utils import _select_tokenizer, print_verbose if model is not None or custom_tokenizer is not None: tokenizer_json = custom_tokenizer or _select_tokenizer(model) # type: ignore if tokenizer_json["type"] == "huggingface_tokenizer": def count_tokens(text: str) -> int: enc = tokenizer_json["tokenizer"].encode(text) return len(enc.ids) elif tokenizer_json["type"] == "openai_tokenizer": model_to_use = _fix_model_name(model) # type: ignore try: if "gpt-4o" in model_to_use: encoding = tiktoken.get_encoding("o200k_base") else: encoding = tiktoken.encoding_for_model(model_to_use) except KeyError: print_verbose("Warning: model not found. Using cl100k_base encoding.") encoding = tiktoken.get_encoding("cl100k_base") def count_tokens(text: str) -> int: return len(encoding.encode(text)) else: raise ValueError("Unsupported tokenizer type") else: def count_tokens(text: str) -> int: return len(default_encoding.encode(text, disallowed_special=())) return count_tokens def _fix_model_name(model: str) -> str: """We normalize some model names to others""" if model in litellm.azure_llms: # azure llms use gpt-35-turbo instead of gpt-3.5-turbo 🙃 return model.replace("-35", "-3.5") elif model in litellm.open_ai_chat_completion_models: return model # type: ignore else: return "gpt-3.5-turbo" def _count_content_list( count_function: TokenCounterFunction, content_list: OpenAIMessageContent, use_default_image_token_count: bool, default_token_count: Optional[int], ) -> int: """ Get the number of tokens from a list of content. """ try: num_tokens = 0 for c in content_list: if isinstance(c, str): num_tokens += count_function(c) elif c["type"] == "text": num_tokens += count_function(c["text"]) elif c["type"] == "image_url": if isinstance(c["image_url"], dict): image_url_dict = c["image_url"] detail = image_url_dict.get("detail", "auto") if detail not in ["low", "high", "auto"]: raise ValueError( f"Invalid detail value: {detail}. Expected 'low', 'high', or 'auto'." ) url = image_url_dict.get("url") num_tokens += calculate_img_tokens( data=url, mode=detail, # type: ignore use_default_image_token_count=use_default_image_token_count, ) elif isinstance(c["image_url"], str): image_url_str = c["image_url"] num_tokens += calculate_img_tokens( data=image_url_str, mode="auto", use_default_image_token_count=use_default_image_token_count, ) else: raise ValueError( f"Invalid image_url type: {type(c['image_url'])}. Expected str or dict." ) else: raise ValueError( f"Invalid content type: {type(c)}. Expected str or dict." ) return num_tokens except Exception as e: if default_token_count is not None: return default_token_count raise ValueError( f"Error getting number of tokens from content list: {e}, default_token_count={default_token_count}" ) def _format_function_definitions(tools): """Formats tool definitions in the format that OpenAI appears to use. Based on https://github.com/forestwanglin/openai-java/blob/main/jtokkit/src/main/java/xyz/felh/openai/jtokkit/utils/TikTokenUtils.java """ lines = [] lines.append("namespace functions {") lines.append("") for tool in tools: function = tool.get("function") if function_description := function.get("description"): lines.append(f"// {function_description}") function_name = function.get("name") parameters = function.get("parameters", {}) properties = parameters.get("properties") if properties and properties.keys(): lines.append(f"type {function_name} = (_: {{") lines.append(_format_object_parameters(parameters, 0)) lines.append("}) => any;") else: lines.append(f"type {function_name} = () => any;") lines.append("") lines.append("} // namespace functions") return "\n".join(lines) def _format_object_parameters(parameters, indent): properties = parameters.get("properties") if not properties: return "" required_params = parameters.get("required", []) lines = [] for key, props in properties.items(): description = props.get("description") if description: lines.append(f"// {description}") question = "?" if required_params and key in required_params: question = "" lines.append(f"{key}{question}: {_format_type(props, indent)},") return "\n".join([" " * max(0, indent) + line for line in lines]) def _format_type(props, indent): type = props.get("type") if type == "string": if "enum" in props: return " | ".join([f'"{item}"' for item in props["enum"]]) return "string" elif type == "array": # items is required, OpenAI throws an error if it's missing return f"{_format_type(props['items'], indent)}[]" elif type == "object": return f"{{\n{_format_object_parameters(props, indent + 2)}\n}}" elif type in ["integer", "number"]: if "enum" in props: return " | ".join([f'"{item}"' for item in props["enum"]]) return "number" elif type == "boolean": return "boolean" elif type == "null": return "null" else: # This is a guess, as an empty string doesn't yield the expected token count return "any"