|
""" |
|
|
|
中文数据:clue superclue |
|
英文数据:glue cnn_dailymail gigaword |
|
代码数据: |
|
数字: |
|
|
|
""" |
|
|
|
import json |
|
import os |
|
import sys |
|
import pandas as pd |
|
from datasets import load_dataset |
|
from utils.log_util import logger |
|
from vocab import load_tokener |
|
from vocab import all_tokenizers |
|
from typing import List, Optional, Union, Literal |
|
|
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
common_units = ["g_bytes/b_tokens", "b_tokens/g_bytes", "t_bytes/t_tokens", "t_tokens/t_bytes", "n_chars/n_tokens", ] |
|
common_corpuses = sorted(["cc100-nl", "cc100-en", "cc100-es", "cc100-fr", "cc100-de", "cc100-ko", |
|
"cc100-fa", "cc100-ar", "cc100-ja"]) |
|
|
|
VALID_CODES_CC100 = [ |
|
"am", "ar", "as", "az", "be", "bg", "bn", "bn_rom", "br", "bs", "ca", "cs", "cy", "da", "de", |
|
"el", "en", "eo", "es", "et", "eu", "fa", "ff", "fi", "fr", "fy", "ga", "gd", "gl", "gn", "gu", |
|
"ha", "he", "hi", "hi_rom", "hr", "ht", "hu", "hy", "id", "ig", "is", "it", "ja", "jv", "ka", |
|
"kk", "km", "kn", "ko", "ku", "ky", "la", "lg", "li", "ln", "lo", "lt", "lv", "mg", "mk", "ml", |
|
"mn", "mr", "ms", "my", "my_zaw", "ne", "nl", "no", "ns", "om", "or", "pa", "pl", "ps", "pt", |
|
"qu", "rm", "ro", "ru", "sa", "si", "sc", "sd", "sk", "sl", "so", "sq", "sr", "ss", "su", "sv", |
|
"sw", "ta", "ta_rom", "te", "te_rom", "th", "tl", "tn", "tr", "ug", "uk", "ur", "ur_rom", "uz", |
|
"vi", "wo", "xh", "yi", "yo", "zh-Hans", "zh-Hant", "zu", |
|
] |
|
|
|
|
|
|
|
|
|
|
|
def get_n_bytes_of_string(string_text): |
|
n_bytes = len(string_text.encode("utf-8")) |
|
return n_bytes |
|
|
|
|
|
def unit_convertor(stat, unit): |
|
n_tokens = stat["n_tokens"] |
|
n_chars = stat["n_chars"] |
|
n_bytes = stat["n_bytes"] |
|
|
|
n_tokens_in_billion = n_tokens / (1000 * 1000 * 1000) |
|
n_tokens_in_trillion = n_tokens / (1000 * 1000 * 1000 * 1000) |
|
n_bytes_in_mb = n_bytes / (1024 * 1024) |
|
n_bytes_in_gb = n_bytes_in_mb / 1024 |
|
n_bytes_in_tb = n_bytes_in_gb / 1024 |
|
|
|
|
|
if unit == "n_tokens/n_bytes": |
|
value = n_tokens / n_bytes |
|
|
|
|
|
elif unit in ["n_chars/n_tokens", "chars_per_token"]: |
|
value = n_chars / n_tokens |
|
elif unit == "n_tokens/n_chars": |
|
value = n_tokens / n_chars |
|
elif unit == "g_bytes/b_tokens": |
|
value = n_bytes_in_gb / n_tokens_in_billion |
|
elif unit == "b_tokens/g_bytes": |
|
value = n_tokens_in_billion / n_bytes_in_gb |
|
elif unit == "t_bytes/t_tokens": |
|
value = n_bytes_in_tb / n_tokens_in_trillion |
|
elif unit == "t_tokens/t_bytes": |
|
value = n_tokens_in_trillion / n_bytes_in_tb |
|
else: |
|
raise "measure not support" |
|
return round(value, 3) |
|
|
|
|
|
def to_dataframe(stats, units=None): |
|
if units is None: |
|
units = common_units |
|
elif not isinstance(units, list): |
|
units = [units] |
|
table = [] |
|
for tokenizer_name, stat in stats.items(): |
|
columns = {"tokenizer": tokenizer_name, "vocab_size": stat["vocab_size"]} |
|
for unit in units: |
|
if unit not in stat: |
|
columns[unit] = unit_convertor(stat, unit) |
|
else: |
|
logger.error(f"unit {unit} not support") |
|
table.append(columns) |
|
df = pd.DataFrame(table) |
|
return df |
|
|
|
|
|
cache = {} |
|
|
|
|
|
def tokenize_corpus( |
|
tokenizer_name: str, |
|
corpuses: List[str], |
|
cache_path: str = "stats/compress_rate.json" |
|
) -> dict: |
|
""" |
|
这个要独立的cache,因为速度慢。 |
|
:param tokenizer_name: |
|
:param corpuses: |
|
:param cache_path: |
|
:return: |
|
""" |
|
|
|
def _tokenize(tokenizer, datasets): |
|
n_tokens = 0 |
|
n_chars = 0 |
|
n_bytes = 0 |
|
for dataset in datasets: |
|
for item in dataset: |
|
text = item["text"] |
|
n_bytes += get_n_bytes_of_string(text) |
|
n_chars += len(text) |
|
encodings = tokenizer.encode(text) |
|
n_tokens += len(encodings) |
|
stat = { |
|
|
|
"vocab_size": len(tokenizer), |
|
"n_bytes": n_bytes, |
|
"n_tokens": n_tokens, |
|
"n_chars": n_chars, |
|
} |
|
return stat |
|
|
|
|
|
cache_id = f"{tokenizer_name}.{'.'.join(corpuses)}" |
|
if not cache and os.path.exists(cache_path): |
|
with open(cache_path, "r", encoding="utf-8") as f_tmp: |
|
cache.update(json.load(f_tmp)) |
|
if cache_id in cache: |
|
logger.info(f"loading {cache_id} from in-memory cache") |
|
return cache[cache_id] |
|
|
|
|
|
tokenizer = load_tokener(tokenizer_name) |
|
datasets = [load_dataset("eson/cc100-samples", corpus.replace("cc100-", ""), split="train") for corpus in corpuses] |
|
stat = _tokenize(tokenizer, datasets) |
|
|
|
|
|
len_before = len(cache) |
|
cache[cache_id] = stat |
|
len_after = len(cache) |
|
logger.info(f"saving {cache_id} to in-memory and file cache: {len_before}->{len_after}") |
|
with open(cache_path, "w", encoding="utf-8") as f_tmp: |
|
json.dump(cache, f_tmp, indent=2) |
|
return stat |
|
|
|
|
|
def get_compression_leaderboard( |
|
corpuses: List[str] = ['cc100-nl'], |
|
unit: str = "b_tokens/g_bytes", |
|
tokenizer_filter: Optional[str] = None, |
|
return_type: Optional[Literal["dict", "dataframe"]] = "dataframe" |
|
) -> Union[pd.DataFrame, dict]: |
|
""" |
|
## TODO |
|
- search by organization, |
|
""" |
|
logger.info(f"corpuses: {corpuses}; unit: {unit}; tokenizer_filter: {tokenizer_filter}") |
|
stats = {} |
|
if tokenizer_filter is not None: |
|
tokenizers = [tokenizer_name for tokenizer_name in all_tokenizers if tokenizer_filter in tokenizer_name] |
|
else: |
|
tokenizers = all_tokenizers |
|
for lang in corpuses: |
|
for tokenizer_name in tokenizers: |
|
stat = tokenize_corpus(tokenizer_name, [lang]) |
|
stats[tokenizer_name] = stat |
|
|
|
if return_type == "dataframe": |
|
token_number_unit, file_size_unit = unit.split("/") |
|
reverse_unit = f"{file_size_unit}/{token_number_unit}" |
|
stats = to_dataframe(stats, [unit, reverse_unit, "n_chars/n_tokens"]) |
|
stats = stats.sort_values(unit) |
|
stats = stats.rename(columns={unit: f' ⬆️{unit}'}) |
|
return stats |
|
|
|
|
|
def update_compress_rate(): |
|
pass |
|
|
|
|
|
def test(): |
|
tokenizer_name = "gpt_4" |
|
tokenizer = load_tokener(tokenizer_name) |
|
stats = {tokenizer_name: tokenize_corpus(tokenizer, ["cc100-en", "cc100-zh-Hans"])} |
|
df = to_dataframe(stats) |
|
|
|
logger.info(f"\n{df.to_markdown(index=False)}") |
|
|
|
|
|
def main(): |
|
if len(sys.argv) == 3: |
|
tokenizer_filter = [sys.argv[1]] |
|
corpuses = [sys.argv[2]] |
|
else: |
|
tokenizer_filter = None |
|
corpuses = common_corpuses |
|
df = get_compression_leaderboard(corpuses) |
|
|
|
logger.info(f"\n{df.to_markdown(index=False)}") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|