nbroad HF staff commited on
Commit
4408779
1 Parent(s): 1cdbc66

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +238 -42
utils.py CHANGED
@@ -1,10 +1,12 @@
1
  import os
2
  from pathlib import Path
 
3
 
4
  import torch
5
- from datasets import load_dataset
6
- from transformers import AutoTokenizer
7
- from huggingface_hub import Repository, create_repo
 
8
  from optimum.onnxruntime import (
9
  AutoOptimizationConfig,
10
  ORTModelForFeatureExtraction,
@@ -21,28 +23,110 @@ opt_configs = {
21
  }
22
 
23
 
24
- def mean_pooling(model_output, attention_mask):
25
- token_embeddings = model_output[
26
- 0
27
- ] # First element of model_output contains all token embeddings
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
  input_mask_expanded = (
29
- attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
30
  )
31
- return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
32
  input_mask_expanded.sum(1), min=1e-9
33
  )
34
 
35
 
36
- def load_hf_dataset(ds_name, ds_config, column_name, ds_split):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
  if ds_config == "":
38
  ds_config = None
39
 
40
- ds = load_dataset(ds_name, config=ds_config, split=ds_split, streaming=True)
 
 
41
 
42
- return ds[column_name]
43
 
 
 
 
44
 
45
- def get_model_and_tokenizer(model_name, optimization_level):
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  optimized_model_name = "model_optimized.onnx"
47
 
48
  model_dir = Path(model_name.replace("/", "_"))
@@ -58,6 +142,9 @@ def get_model_and_tokenizer(model_name, optimization_level):
58
 
59
  optimizer.optimize(save_dir=model_dir, optimization_config=optimization_config)
60
 
 
 
 
61
  return (
62
  ORTModelForFeatureExtraction.from_pretrained(
63
  model_dir,
@@ -68,31 +155,95 @@ def get_model_and_tokenizer(model_name, optimization_level):
68
  )
69
 
70
 
71
- def tokenize(examples, tokenizer, column_name, padding=True):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
  # TODO: add lengths, sort by length, use dynamic padding
73
- return tokenizer(examples[column_name], truncation=True, padding=padding)
 
 
 
74
 
75
 
76
  @torch.inference_mode()
77
  def batch_embed(
78
- ds,
79
- model,
80
- tokenizer,
81
- column_name,
82
- inference_batch_size,
83
- upload_batch_size,
84
- new_dataset_id,
85
- opt_level,
86
- num_proc,
 
87
  ):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  repo = init_git_repo(new_dataset_id)
89
 
90
  iterator = iter(
91
  ds.map(
92
  tokenize,
93
  batched=True,
94
- num_proc=num_proc,
95
- batch_size=2000,
96
  fn_kwargs={
97
  "tokenizer": tokenizer,
98
  "column_name": column_name,
@@ -102,41 +253,58 @@ def batch_embed(
102
  )
103
  )
104
 
 
 
105
  embeds = []
106
 
107
- count = 0
 
 
 
 
 
108
 
109
  loop = True
110
  while loop:
111
- batch = [next(iterator, None) for _ in range(inference_batch_size)]
112
 
 
113
  if batch[-1] is None:
114
  batch = [x for x in batch if x is not None]
115
  loop = False
116
 
117
- ids = torch.tensor([b["input_ids"] for b in batch])
118
- mask = torch.tensor([b["attention_mask"] for b in batch])
119
  t_ids = torch.zeros_like(ids)
120
 
121
  outputs = model(input_ids=ids, attention_mask=mask, token_type_ids=t_ids)
122
 
123
- embeds.extend(mean_pooling(outputs, mask).cpu().tolist())
124
 
125
- count += len(batch)
126
 
127
  if len(embeds) > upload_batch_size:
128
- push_to_repo(repo, count)
129
  embeds = []
 
 
 
 
130
 
131
- print(count)
132
 
133
- print("finished")
134
 
 
 
 
135
 
136
- def init_git_repo(repo_id):
 
 
 
 
137
  local_dir = repo_id.replace("/", "_")
138
 
139
- # Make sure the repo exists.
140
  create_repo(
141
  repo_id,
142
  token=os.environ["HF_TOKEN"],
@@ -151,17 +319,45 @@ def init_git_repo(repo_id):
151
  token=os.environ["HF_TOKEN"],
152
  skip_lfs_files=True,
153
  )
154
- except EnvironmentError:
155
- pass
 
156
 
157
- repo.git_pull()
 
158
 
159
  return repo
160
 
161
 
162
- def push_to_repo(repo, batch_num):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
163
  repo.push_to_hub(
164
- commit_message=f"Finished {batch_num} rows",
165
  blocking=False,
166
  auto_lfs_prune=True,
167
  )
 
1
  import os
2
  from pathlib import Path
3
+ from typing import Union, Dict, List
4
 
5
  import torch
6
+ import datasets
7
+ from datasets import load_dataset, Dataset
8
+ from transformers import AutoTokenizer, PreTrainedTokenizer
9
+ from huggingface_hub import Repository, create_repo, HfApi
10
  from optimum.onnxruntime import (
11
  AutoOptimizationConfig,
12
  ORTModelForFeatureExtraction,
 
23
  }
24
 
25
 
26
+ def get_batch_size(device_name: str, model_name: str, opt_level: str):
27
+ """
28
+ TODO: run actual tests
29
+
30
+ T4 has 16GB
31
+ A10 has 24GB
32
+
33
+ Args:
34
+ device_name (`str`):
35
+ The name of the GPU device in use.
36
+ model_name (`str`):
37
+ The name of the model in use.
38
+ opt_level (`str`):
39
+ The optimization level in use.
40
+
41
+ Returns:
42
+ `int`:
43
+ The batch size to use.
44
+ """
45
+
46
+ if "small" in model_name:
47
+ bs = 128
48
+ elif "base" in model_name:
49
+ bs = 64
50
+ elif "large" in model_name:
51
+ bs = 32
52
+ else:
53
+ bs = 16
54
+
55
+ if "A10" in device_name:
56
+ bs *= 2
57
+
58
+ if opt_level == "O4":
59
+ bs *= 2
60
+
61
+ return bs
62
+
63
+
64
+ def mean_pooling(last_hidden_state: torch.Tensor, attention_mask: torch.Tensor):
65
+ """
66
+ Mean pool the token embeddings.
67
+
68
+ Args:
69
+ last_hidden_state (`tuple`):
70
+ The output of the model.
71
+ attention_mask (`torch.Tensor`):
72
+ The attention mask.
73
+
74
+ Returns:
75
+ `torch.Tensor`:
76
+ The mean pooled embeddings.
77
+ """
78
  input_mask_expanded = (
79
+ attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
80
  )
81
+ return torch.sum(last_hidden_state * input_mask_expanded, 1) / torch.clamp(
82
  input_mask_expanded.sum(1), min=1e-9
83
  )
84
 
85
 
86
+ def load_hf_dataset(ds_name: str, ds_config: str = None, ds_split: str = "train"):
87
+ """
88
+ Load a dataset from the HuggingFace Hub. Will be streaming so
89
+ as to not load the whole dataset to local storage.
90
+
91
+ Args:
92
+ ds_name (`str`):
93
+ The name of the dataset to load.
94
+ ds_config (`str`, *optional*, Defaults to `None`):
95
+ The configuration of the dataset to load.
96
+ ds_split (`str`, *optional*, Defaults to `"train"`):
97
+ The split of the dataset to load.
98
+
99
+ Returns:
100
+ ds (`datasets.IterableDataset`):
101
+ The loaded dataset.
102
+ """
103
+
104
  if ds_config == "":
105
  ds_config = None
106
 
107
+ ds = load_dataset(ds_name, ds_config, split=ds_split, streaming=True)
108
+
109
+ return ds
110
 
 
111
 
112
+ def get_model_and_tokenizer(model_name: str, optimization_level: str):
113
+ """
114
+ Load the model and tokenizer from the HuggingFace Hub.
115
 
116
+ If the model is not already optimized, optimize it and save it to the local directory.
117
+
118
+ Args:
119
+ model_name (`str`):
120
+ The name of the model to load.
121
+ optimization_level (`str`):
122
+ The optimization level to use. Should be one of `"O2"`, `"O3"`, or `"O4"`.
123
+
124
+ Returns:
125
+ model (`ORTModelForFeatureExtraction`):
126
+ The optimized model.
127
+ tokenizer (`PreTrainedTokenizer`):
128
+ The tokenizer.
129
+ """
130
  optimized_model_name = "model_optimized.onnx"
131
 
132
  model_dir = Path(model_name.replace("/", "_"))
 
142
 
143
  optimizer.optimize(save_dir=model_dir, optimization_config=optimization_config)
144
 
145
+ else:
146
+ tokenizer = AutoTokenizer.from_pretrained(model_dir)
147
+
148
  return (
149
  ORTModelForFeatureExtraction.from_pretrained(
150
  model_dir,
 
155
  )
156
 
157
 
158
+ def tokenize(
159
+ examples: Dict[str, List[str]],
160
+ tokenizer: PreTrainedTokenizer,
161
+ column_name: str = "text",
162
+ padding: Union[bool, str] = True,
163
+ max_length: int = 512,
164
+ ):
165
+ """
166
+ Tokenize the examples using the tokenizer.
167
+
168
+ Args:
169
+ examples (`Dict[str, List[str]]`):
170
+ examples to tokenize
171
+ tokenizer (`PreTrainedTokenizer`):
172
+ tokenizer to use
173
+ column_name (`str`, *optional*, defaults to `text`):
174
+ column name to use for tokenization. Defaults to `text`
175
+ padding (`bool`, *optional*, defaults to `True`):
176
+ whether to pad the examples. Defaults to `True`
177
+ Use `"max_length"` if using `O4` optimization level
178
+ If `True`, the batch will be padded to the longest in the batch.
179
+ max_length (`int`, *optional*, Defaults to `512`):
180
+ max length to use for the model. Defaults to `512`.
181
+ Any sequences longer will be truncated.
182
+ If padding is `"max_length"`, the padding will be added until the sequence
183
+ is of length `max_length`.
184
+
185
+ Returns:
186
+ `Dict[str, List[List[int]]]`:
187
+ tokenized examples
188
+ """
189
  # TODO: add lengths, sort by length, use dynamic padding
190
+ # TODO: option for controlling length for models that can go shorter/longer than 512
191
+ return tokenizer(
192
+ examples[column_name], truncation=True, padding=padding, max_length=max_length
193
+ )
194
 
195
 
196
  @torch.inference_mode()
197
  def batch_embed(
198
+ ds: datasets.IterableDataset,
199
+ model: ORTModelForFeatureExtraction,
200
+ tokenizer: PreTrainedTokenizer,
201
+ model_name: str,
202
+ column_name: str,
203
+ new_dataset_id: str,
204
+ opt_level: str,
205
+ upload_batch_size: int = 10_000,
206
+ map_batch_size: int = 2000,
207
+ # progress,
208
  ):
209
+ """
210
+ Run the model on the dataset and upload the embeddings to the hub.
211
+
212
+ Args:
213
+ ds (`datasets.Dataset`):
214
+ dataset to embed. From `load_hf_dataset`
215
+ model (`ORTModelForFeatureExtraction`):
216
+ model to use for embedding. From `get_model_and_tokenizer`
217
+ tokenizer (`AutoTokenizer`):
218
+ tokenizer to use for embedding. From `get_model_and_tokenizer`
219
+ model_name (`str`):
220
+ name of the model to use. Used to determine batch size.
221
+ column_name (`str`):
222
+ column name to use for embedding. Default option in gradio app is `text`
223
+ new_dataset_id (`str`):
224
+ id of the new dataset to create. Should include username or organization.
225
+ e.g. nbroad/new-embeddings
226
+ opt_level (`str`):
227
+ optimization level to use. Should be one of `O2`, `O3`, `O4`
228
+ See here for more details on optimization levels:
229
+ https://huggingface.co/docs/optimum/onnxruntime/usage_guides/optimization#optimization-configuration
230
+ upload_batch_size (`int`, *optional*, defaults to `10_000`):
231
+ number of embeddings to upload at once. Defaults to 10,000.
232
+ map_batch_size (`int`, *optional*, defaults to `2000`):
233
+ number of examples to tokenize at once. Defaults to 2000.
234
+ """
235
+
236
+ api = HfApi(
237
+ token=os.environ["HF_TOKEN"],
238
+ )
239
+
240
  repo = init_git_repo(new_dataset_id)
241
 
242
  iterator = iter(
243
  ds.map(
244
  tokenize,
245
  batched=True,
246
+ batch_size=map_batch_size,
 
247
  fn_kwargs={
248
  "tokenizer": tokenizer,
249
  "column_name": column_name,
 
253
  )
254
  )
255
 
256
+ # progress.tqdm(iterator)
257
+
258
  embeds = []
259
 
260
+ last_count = 0
261
+ current_count = 0
262
+
263
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
264
+
265
+ inference_bs = get_batch_size(torch.cuda.get_device_name(0), model_name, opt_level)
266
 
267
  loop = True
268
  while loop:
269
+ batch = [next(iterator, None) for _ in range(inference_bs)]
270
 
271
+ # batch will have None values when iterator runs out
272
  if batch[-1] is None:
273
  batch = [x for x in batch if x is not None]
274
  loop = False
275
 
276
+ ids = torch.tensor([b["input_ids"] for b in batch], device=device)
277
+ mask = torch.tensor([b["attention_mask"] for b in batch], device=device)
278
  t_ids = torch.zeros_like(ids)
279
 
280
  outputs = model(input_ids=ids, attention_mask=mask, token_type_ids=t_ids)
281
 
282
+ embeds.extend(mean_pooling(outputs[0], mask).cpu().tolist())
283
 
284
+ current_count += len(batch)
285
 
286
  if len(embeds) > upload_batch_size:
287
+ push_to_repo(repo, last_count, current_count, embeds)
288
  embeds = []
289
+ last_count = current_count
290
+
291
+ if len(embeds) > 0:
292
+ push_to_repo(repo, last_count, current_count, embeds)
293
 
294
+ return
295
 
 
296
 
297
+ def init_git_repo(repo_id: str):
298
+ """
299
+ Initialize a git repo for the new dataset.
300
 
301
+ Args:
302
+ repo_id (`str`):
303
+ id of the new dataset to create. Should include username or organization.
304
+ e.g. nbroad/new-embeddings
305
+ """
306
  local_dir = repo_id.replace("/", "_")
307
 
 
308
  create_repo(
309
  repo_id,
310
  token=os.environ["HF_TOKEN"],
 
319
  token=os.environ["HF_TOKEN"],
320
  skip_lfs_files=True,
321
  )
322
+ except Exception as e:
323
+ print(e)
324
+ repo = None
325
 
326
+ if repo is not None:
327
+ repo.git_pull()
328
 
329
  return repo
330
 
331
 
332
+ def push_to_repo(
333
+ repo: str, last_count: int, current_count: int, embeds: List[List[float]]
334
+ ):
335
+ """
336
+ Push embeddings to the repo.
337
+
338
+ Args:
339
+ repo (`huggingface_hub.Repository`):
340
+ repo to push to
341
+ last_count (`int`):
342
+ last count of embeddings.
343
+ This is the number of embeddings that have already been pushed.
344
+ current_count (`int`):
345
+ current count of embeddings.
346
+ This is the number of embeddings that have been pushed after this batch.
347
+ embeds (`List[List[float]]`):
348
+ list of embeddings to push to the repo
349
+ """
350
+ temp_ds = Dataset.from_dict({"embeddings": embeds})
351
+
352
+ data_dir = Path(repo.local_dir) / "data"
353
+ data_dir.mkdir(exist_ok=True, parents=True)
354
+
355
+ temp_ds.to_parquet(
356
+ str(data_dir / f"embeddings_{last_count}_{current_count}.parquet")
357
+ )
358
+
359
  repo.push_to_hub(
360
+ commit_message=f"Embedded examples {last_count} thru {current_count}",
361
  blocking=False,
362
  auto_lfs_prune=True,
363
  )