bulk_embeddings / utils.py
nbroad's picture
nbroad HF staff
Update utils.py
4408779
import os
from pathlib import Path
from typing import Union, Dict, List
import torch
import datasets
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, PreTrainedTokenizer
from huggingface_hub import Repository, create_repo, HfApi
from optimum.onnxruntime import (
AutoOptimizationConfig,
ORTModelForFeatureExtraction,
ORTOptimizer,
)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
opt_configs = {
"O2": AutoOptimizationConfig.O2(),
"O3": AutoOptimizationConfig.O3(),
"O4": AutoOptimizationConfig.O4(),
}
def get_batch_size(device_name: str, model_name: str, opt_level: str):
"""
TODO: run actual tests
T4 has 16GB
A10 has 24GB
Args:
device_name (`str`):
The name of the GPU device in use.
model_name (`str`):
The name of the model in use.
opt_level (`str`):
The optimization level in use.
Returns:
`int`:
The batch size to use.
"""
if "small" in model_name:
bs = 128
elif "base" in model_name:
bs = 64
elif "large" in model_name:
bs = 32
else:
bs = 16
if "A10" in device_name:
bs *= 2
if opt_level == "O4":
bs *= 2
return bs
def mean_pooling(last_hidden_state: torch.Tensor, attention_mask: torch.Tensor):
"""
Mean pool the token embeddings.
Args:
last_hidden_state (`tuple`):
The output of the model.
attention_mask (`torch.Tensor`):
The attention mask.
Returns:
`torch.Tensor`:
The mean pooled embeddings.
"""
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
)
return torch.sum(last_hidden_state * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
def load_hf_dataset(ds_name: str, ds_config: str = None, ds_split: str = "train"):
"""
Load a dataset from the HuggingFace Hub. Will be streaming so
as to not load the whole dataset to local storage.
Args:
ds_name (`str`):
The name of the dataset to load.
ds_config (`str`, *optional*, Defaults to `None`):
The configuration of the dataset to load.
ds_split (`str`, *optional*, Defaults to `"train"`):
The split of the dataset to load.
Returns:
ds (`datasets.IterableDataset`):
The loaded dataset.
"""
if ds_config == "":
ds_config = None
ds = load_dataset(ds_name, ds_config, split=ds_split, streaming=True)
return ds
def get_model_and_tokenizer(model_name: str, optimization_level: str):
"""
Load the model and tokenizer from the HuggingFace Hub.
If the model is not already optimized, optimize it and save it to the local directory.
Args:
model_name (`str`):
The name of the model to load.
optimization_level (`str`):
The optimization level to use. Should be one of `"O2"`, `"O3"`, or `"O4"`.
Returns:
model (`ORTModelForFeatureExtraction`):
The optimized model.
tokenizer (`PreTrainedTokenizer`):
The tokenizer.
"""
optimized_model_name = "model_optimized.onnx"
model_dir = Path(model_name.replace("/", "_"))
if not (model_dir / optimized_model_name).exists():
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained(model_dir)
model = ORTModelForFeatureExtraction.from_pretrained(model_name, export=True)
model.save_pretrained(model_dir)
optimizer = ORTOptimizer.from_pretrained(model)
optimization_config = opt_configs[optimization_level]
optimizer.optimize(save_dir=model_dir, optimization_config=optimization_config)
else:
tokenizer = AutoTokenizer.from_pretrained(model_dir)
return (
ORTModelForFeatureExtraction.from_pretrained(
model_dir,
file_name=optimized_model_name,
provider="CUDAExecutionProvider",
),
tokenizer,
)
def tokenize(
examples: Dict[str, List[str]],
tokenizer: PreTrainedTokenizer,
column_name: str = "text",
padding: Union[bool, str] = True,
max_length: int = 512,
):
"""
Tokenize the examples using the tokenizer.
Args:
examples (`Dict[str, List[str]]`):
examples to tokenize
tokenizer (`PreTrainedTokenizer`):
tokenizer to use
column_name (`str`, *optional*, defaults to `text`):
column name to use for tokenization. Defaults to `text`
padding (`bool`, *optional*, defaults to `True`):
whether to pad the examples. Defaults to `True`
Use `"max_length"` if using `O4` optimization level
If `True`, the batch will be padded to the longest in the batch.
max_length (`int`, *optional*, Defaults to `512`):
max length to use for the model. Defaults to `512`.
Any sequences longer will be truncated.
If padding is `"max_length"`, the padding will be added until the sequence
is of length `max_length`.
Returns:
`Dict[str, List[List[int]]]`:
tokenized examples
"""
# TODO: add lengths, sort by length, use dynamic padding
# TODO: option for controlling length for models that can go shorter/longer than 512
return tokenizer(
examples[column_name], truncation=True, padding=padding, max_length=max_length
)
@torch.inference_mode()
def batch_embed(
ds: datasets.IterableDataset,
model: ORTModelForFeatureExtraction,
tokenizer: PreTrainedTokenizer,
model_name: str,
column_name: str,
new_dataset_id: str,
opt_level: str,
upload_batch_size: int = 10_000,
map_batch_size: int = 2000,
# progress,
):
"""
Run the model on the dataset and upload the embeddings to the hub.
Args:
ds (`datasets.Dataset`):
dataset to embed. From `load_hf_dataset`
model (`ORTModelForFeatureExtraction`):
model to use for embedding. From `get_model_and_tokenizer`
tokenizer (`AutoTokenizer`):
tokenizer to use for embedding. From `get_model_and_tokenizer`
model_name (`str`):
name of the model to use. Used to determine batch size.
column_name (`str`):
column name to use for embedding. Default option in gradio app is `text`
new_dataset_id (`str`):
id of the new dataset to create. Should include username or organization.
e.g. nbroad/new-embeddings
opt_level (`str`):
optimization level to use. Should be one of `O2`, `O3`, `O4`
See here for more details on optimization levels:
https://huggingface.co/docs/optimum/onnxruntime/usage_guides/optimization#optimization-configuration
upload_batch_size (`int`, *optional*, defaults to `10_000`):
number of embeddings to upload at once. Defaults to 10,000.
map_batch_size (`int`, *optional*, defaults to `2000`):
number of examples to tokenize at once. Defaults to 2000.
"""
api = HfApi(
token=os.environ["HF_TOKEN"],
)
repo = init_git_repo(new_dataset_id)
iterator = iter(
ds.map(
tokenize,
batched=True,
batch_size=map_batch_size,
fn_kwargs={
"tokenizer": tokenizer,
"column_name": column_name,
"padding": "max_length" if opt_level == "O4" else True,
},
remove_columns=ds.column_names,
)
)
# progress.tqdm(iterator)
embeds = []
last_count = 0
current_count = 0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inference_bs = get_batch_size(torch.cuda.get_device_name(0), model_name, opt_level)
loop = True
while loop:
batch = [next(iterator, None) for _ in range(inference_bs)]
# batch will have None values when iterator runs out
if batch[-1] is None:
batch = [x for x in batch if x is not None]
loop = False
ids = torch.tensor([b["input_ids"] for b in batch], device=device)
mask = torch.tensor([b["attention_mask"] for b in batch], device=device)
t_ids = torch.zeros_like(ids)
outputs = model(input_ids=ids, attention_mask=mask, token_type_ids=t_ids)
embeds.extend(mean_pooling(outputs[0], mask).cpu().tolist())
current_count += len(batch)
if len(embeds) > upload_batch_size:
push_to_repo(repo, last_count, current_count, embeds)
embeds = []
last_count = current_count
if len(embeds) > 0:
push_to_repo(repo, last_count, current_count, embeds)
return
def init_git_repo(repo_id: str):
"""
Initialize a git repo for the new dataset.
Args:
repo_id (`str`):
id of the new dataset to create. Should include username or organization.
e.g. nbroad/new-embeddings
"""
local_dir = repo_id.replace("/", "_")
create_repo(
repo_id,
token=os.environ["HF_TOKEN"],
private=True,
exist_ok=True,
)
try:
repo = Repository(
local_dir=local_dir,
clone_from=repo_id,
repo_type="dataset",
token=os.environ["HF_TOKEN"],
skip_lfs_files=True,
)
except Exception as e:
print(e)
repo = None
if repo is not None:
repo.git_pull()
return repo
def push_to_repo(
repo: str, last_count: int, current_count: int, embeds: List[List[float]]
):
"""
Push embeddings to the repo.
Args:
repo (`huggingface_hub.Repository`):
repo to push to
last_count (`int`):
last count of embeddings.
This is the number of embeddings that have already been pushed.
current_count (`int`):
current count of embeddings.
This is the number of embeddings that have been pushed after this batch.
embeds (`List[List[float]]`):
list of embeddings to push to the repo
"""
temp_ds = Dataset.from_dict({"embeddings": embeds})
data_dir = Path(repo.local_dir) / "data"
data_dir.mkdir(exist_ok=True, parents=True)
temp_ds.to_parquet(
str(data_dir / f"embeddings_{last_count}_{current_count}.parquet")
)
repo.push_to_hub(
commit_message=f"Embedded examples {last_count} thru {current_count}",
blocking=False,
auto_lfs_prune=True,
)