orgtalk / camel /utils.py
ennet's picture
Duplicate from sp12138sp/ChatDev
594c559
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =========== Copyright 2023 @ CAMEL-AI.org. All Rights Reserved. ===========
import os
import re
import zipfile
from functools import wraps
from typing import Any, Callable, List, Optional, Set, TypeVar
import requests
import tiktoken
from camel.messages import OpenAIMessage
from camel.typing import ModelType, TaskType
F = TypeVar('F', bound=Callable[..., Any])
import time
def count_tokens_openai_chat_models(
messages: List[OpenAIMessage],
encoding: Any,
) -> int:
r"""Counts the number of tokens required to generate an OpenAI chat based
on a given list of messages.
Args:
messages (List[OpenAIMessage]): The list of messages.
encoding (Any): The encoding method to use.
Returns:
int: The number of tokens required.
"""
num_tokens = 0
for message in messages:
# message follows <im_start>{role/name}\n{content}<im_end>\n
num_tokens += 4
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name": # if there's a name, the role is omitted
num_tokens += -1 # role is always 1 token
num_tokens += 2 # every reply is primed with <im_start>assistant
return num_tokens
def num_tokens_from_messages(
messages: List[OpenAIMessage],
model: ModelType,
) -> int:
r"""Returns the number of tokens used by a list of messages.
Args:
messages (List[OpenAIMessage]): The list of messages to count the
number of tokens for.
model (ModelType): The OpenAI model used to encode the messages.
Returns:
int: The total number of tokens used by the messages.
Raises:
NotImplementedError: If the specified `model` is not implemented.
References:
- https://github.com/openai/openai-python/blob/main/chatml.md
- https://platform.openai.com/docs/models/gpt-4
- https://platform.openai.com/docs/models/gpt-3-5
"""
try:
value_for_tiktoken = model.value_for_tiktoken
encoding = tiktoken.encoding_for_model(value_for_tiktoken)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")
if model in {
ModelType.GPT_3_5_TURBO, ModelType.GPT_4, ModelType.GPT_4_32k,
ModelType.STUB
}:
return count_tokens_openai_chat_models(messages, encoding)
else:
raise NotImplementedError(
f"`num_tokens_from_messages`` is not presently implemented "
f"for model {model}. "
f"See https://github.com/openai/openai-python/blob/main/chatml.md "
f"for information on how messages are converted to tokens. "
f"See https://platform.openai.com/docs/models/gpt-4"
f"or https://platform.openai.com/docs/models/gpt-3-5"
f"for information about openai chat models.")
def get_model_token_limit(model: ModelType) -> int:
r"""Returns the maximum token limit for a given model.
Args:
model (ModelType): The type of the model.
Returns:
int: The maximum token limit for the given model.
"""
if model == ModelType.GPT_3_5_TURBO:
return 16384
elif model == ModelType.GPT_4:
return 8192
elif model == ModelType.GPT_4_32k:
return 32768
elif model == ModelType.STUB:
return 4096
else:
raise ValueError("Unknown model type")
def openai_api_key_required(func: F) -> F:
r"""Decorator that checks if the OpenAI API key is available in the
environment variables.
Args:
func (callable): The function to be wrapped.
Returns:
callable: The decorated function.
Raises:
ValueError: If the OpenAI API key is not found in the environment
variables.
"""
@wraps(func)
def wrapper(self, *args, **kwargs):
from camel.agents.chat_agent import ChatAgent
if not isinstance(self, ChatAgent):
raise ValueError("Expected ChatAgent")
if self.model == ModelType.STUB:
return func(self, *args, **kwargs)
elif 'OPENAI_API_KEY' in os.environ:
return func(self, *args, **kwargs)
else:
raise ValueError('OpenAI API key not found.')
return wrapper
def print_text_animated(text, delay: float = 0.005, end: str = ""):
r"""Prints the given text with an animated effect.
Args:
text (str): The text to print.
delay (float, optional): The delay between each character printed.
(default: :obj:`0.02`)
end (str, optional): The end character to print after the text.
(default: :obj:`""`)
"""
for char in text:
print(char, end=end, flush=True)
time.sleep(delay)
print('\n')
def get_prompt_template_key_words(template: str) -> Set[str]:
r"""Given a string template containing curly braces {}, return a set of
the words inside the braces.
Args:
template (str): A string containing curly braces.
Returns:
List[str]: A list of the words inside the curly braces.
Example:
>>> get_prompt_template_key_words('Hi, {name}! How are you {status}?')
{'name', 'status'}
"""
return set(re.findall(r'{([^}]*)}', template))
def get_first_int(string: str) -> Optional[int]:
r"""Returns the first integer number found in the given string.
If no integer number is found, returns None.
Args:
string (str): The input string.
Returns:
int or None: The first integer number found in the string, or None if
no integer number is found.
"""
match = re.search(r'\d+', string)
if match:
return int(match.group())
else:
return None
def download_tasks(task: TaskType, folder_path: str) -> None:
# Define the path to save the zip file
zip_file_path = os.path.join(folder_path, "tasks.zip")
# Download the zip file from the Google Drive link
response = requests.get("https://huggingface.co/datasets/camel-ai/"
f"metadata/resolve/main/{task.value}_tasks.zip")
# Save the zip file
with open(zip_file_path, "wb") as f:
f.write(response.content)
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(folder_path)
# Delete the zip file
os.remove(zip_file_path)