tokenizer-arena / utils /compression_util.py
xu-song's picture
update
f331792
raw
history blame
7.27 kB
"""
中文数据:clue superclue
英文数据:glue cnn_dailymail gigaword
代码数据:
数字:
"""
import json
import os
import sys
import pandas as pd
from datasets import load_dataset
from utils.log_util import logger
from vocab import load_tokener
from vocab import all_tokenizers
from typing import List, Optional, Union, Literal
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
common_units = ["g_bytes/b_tokens", "b_tokens/g_bytes", "t_bytes/t_tokens", "t_tokens/t_bytes", "n_chars/n_tokens", ]
common_corpuses = sorted(["cc100-en", "cc100-zh-Hans", "cc100-es", "cc100-fr", "cc100-de", "cc100-ko",
"cc100-fa", "cc100-ar", "cc100-ja"])
VALID_CODES_CC100 = [
"am", "ar", "as", "az", "be", "bg", "bn", "bn_rom", "br", "bs", "ca", "cs", "cy", "da", "de",
"el", "en", "eo", "es", "et", "eu", "fa", "ff", "fi", "fr", "fy", "ga", "gd", "gl", "gn", "gu",
"ha", "he", "hi", "hi_rom", "hr", "ht", "hu", "hy", "id", "ig", "is", "it", "ja", "jv", "ka",
"kk", "km", "kn", "ko", "ku", "ky", "la", "lg", "li", "ln", "lo", "lt", "lv", "mg", "mk", "ml",
"mn", "mr", "ms", "my", "my_zaw", "ne", "nl", "no", "ns", "om", "or", "pa", "pl", "ps", "pt",
"qu", "rm", "ro", "ru", "sa", "si", "sc", "sd", "sk", "sl", "so", "sq", "sr", "ss", "su", "sv",
"sw", "ta", "ta_rom", "te", "te_rom", "th", "tl", "tn", "tr", "ug", "uk", "ur", "ur_rom", "uz",
"vi", "wo", "xh", "yi", "yo", "zh-Hans", "zh-Hant", "zu",
]
# code: https://huggingface.co/datasets/codeparrot/github-code-clean python java c sql html
# math:
def get_n_bytes_of_string(string_text):
n_bytes = len(string_text.encode("utf-8"))
return n_bytes
def unit_convertor(stat, unit):
n_tokens = stat["n_tokens"]
n_chars = stat["n_chars"]
n_bytes = stat["n_bytes"]
n_tokens_in_billion = n_tokens / (1000 * 1000 * 1000)
n_tokens_in_trillion = n_tokens / (1000 * 1000 * 1000 * 1000)
n_bytes_in_mb = n_bytes / (1024 * 1024)
n_bytes_in_gb = n_bytes_in_mb / 1024
n_bytes_in_tb = n_bytes_in_gb / 1024
# n_chars_in_billion = n_chars / (1000 * 1000 * 1000)
if unit == "n_tokens/n_bytes":
value = n_tokens / n_bytes
# the average number of characters per token
elif unit in ["n_chars/n_tokens", "chars_per_token"]: # 重要:平均一个token包含多少个字符。
value = n_chars / n_tokens
elif unit == "n_tokens/n_chars": # 一个中文汉字需要几个token?
value = n_tokens / n_chars
elif unit == "g_bytes/b_tokens":
value = n_bytes_in_gb / n_tokens_in_billion
elif unit == "b_tokens/g_bytes":
value = n_tokens_in_billion / n_bytes_in_gb
elif unit == "t_bytes/t_tokens": # 重要:
value = n_bytes_in_tb / n_tokens_in_trillion
elif unit == "t_tokens/t_bytes":
value = n_tokens_in_trillion / n_bytes_in_tb
else:
raise "measure not support"
return round(value, 3)
def to_dataframe(stats, units=None):
if units is None:
units = common_units
elif not isinstance(units, list):
units = [units]
table = []
for tokenizer_name, stat in stats.items():
columns = {"tokenizer": tokenizer_name, "vocab_size": stat["vocab_size"]}
for unit in units:
if unit not in stat:
columns[unit] = unit_convertor(stat, unit)
else:
logger.error(f"unit {unit} not support")
table.append(columns)
df = pd.DataFrame(table)
return df
cache = {}
def tokenize_corpus(
tokenizer_name: str,
corpuses: List[str],
cache_path: str = "stats/compress_rate.json"
) -> dict:
"""
这个要独立的cache,因为速度慢。
:param tokenizer_name:
:param corpuses:
:param cache_path:
:return:
"""
def _tokenize(tokenizer, datasets):
n_tokens = 0
n_chars = 0
n_bytes = 0
for dataset in datasets:
for item in dataset:
text = item["text"]
n_bytes += get_n_bytes_of_string(text)
n_chars += len(text)
encodings = tokenizer.encode(text)
n_tokens += len(encodings)
stat = {
# "vocab_size": len(tokenizer.vocab_size,
"vocab_size": len(tokenizer),
"n_bytes": n_bytes,
"n_tokens": n_tokens,
"n_chars": n_chars,
}
return stat
# load from cache
cache_id = f"{tokenizer_name}.{'.'.join(corpuses)}"
if not cache and os.path.exists(cache_path):
with open(cache_path, "r", encoding="utf-8") as f_tmp:
cache.update(json.load(f_tmp))
if cache_id in cache:
logger.info(f"loading {cache_id} from in-memory cache")
return cache[cache_id]
# tokenize corpus
tokenizer = load_tokener(tokenizer_name)
datasets = [load_dataset("eson/cc100-samples", corpus.replace("cc100-", ""), split="train") for corpus in corpuses]
stat = _tokenize(tokenizer, datasets)
# save to cache
len_before = len(cache)
cache[cache_id] = stat
len_after = len(cache)
logger.info(f"saving {cache_id} to in-memory and file cache: {len_before}->{len_after}")
with open(cache_path, "w", encoding="utf-8") as f_tmp:
json.dump(cache, f_tmp, indent=2)
return stat
def get_compression_leaderboard(
corpuses: List[str] = ['cc100-en'],
unit: str = "b_tokens/g_bytes",
tokenizer_filter: Optional[str] = None,
return_type: Optional[Literal["dict", "dataframe"]] = "dataframe"
) -> Union[pd.DataFrame, dict]:
"""
## TODO
- search by organization,
"""
logger.info(f"corpuses: {corpuses}; unit: {unit}; tokenizer_filter: {tokenizer_filter}")
stats = {}
if tokenizer_filter is not None:
tokenizers = [tokenizer_name for tokenizer_name in all_tokenizers if tokenizer_filter in tokenizer_name]
else:
tokenizers = all_tokenizers
for lang in corpuses:
for tokenizer_name in tokenizers:
stat = tokenize_corpus(tokenizer_name, [lang])
stats[tokenizer_name] = stat
if return_type == "dataframe":
token_number_unit, file_size_unit = unit.split("/")
reverse_unit = f"{file_size_unit}/{token_number_unit}"
stats = to_dataframe(stats, [unit, reverse_unit, "n_chars/n_tokens"])
stats = stats.sort_values(unit)
stats = stats.rename(columns={unit: f' ⬆️{unit}'})
return stats
def update_compress_rate():
pass
def test():
tokenizer_name = "gpt_4"
tokenizer = load_tokener(tokenizer_name)
stats = {tokenizer_name: tokenize_corpus(tokenizer, ["cc100-en", "cc100-zh-Hans"])}
df = to_dataframe(stats)
# print(df.to_markdown(index=False, tablefmt='fancy_grid'))
logger.info(f"\n{df.to_markdown(index=False)}")
def main():
if len(sys.argv) == 3:
tokenizer_filter = [sys.argv[1]]
corpuses = [sys.argv[2]]
else:
tokenizer_filter = None
corpuses = common_corpuses
df = get_compression_leaderboard(corpuses)
# print(df.to_markdown(index=False, tablefmt='fancy_grid'))
logger.info(f"\n{df.to_markdown(index=False)}")
if __name__ == "__main__":
main()
# test()