khalidsaifullaah commited on
Commit
3f395b9
1 Parent(s): d458774

Saving weights and logs of step 2500

Browse files
events.out.tfevents.1626027152.t1v-n-934dd7d5-w-0.42551.3.v2 ADDED
@@ -0,0 +1,3 @@
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:6014b2439019a454ff299b5d33dd83d9f8e5dfa5508901303ac09b2ac318a8e1
3
+ size 367914
flax_model.msgpack ADDED
@@ -0,0 +1,3 @@
 
 
 
1
+ version https://git-lfs.github.com/spec/v1
2
+ oid sha256:064a2c476ff8b960a62039cf6ee5ad1450f4e7848dfad669dddf51d21c496847
3
+ size 497764120
run.sh ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env bash
2
+ python run_clm_flax.py \
3
+ --output_dir="${MODEL_DIR}" \
4
+ --model_type="gpt2" \
5
+ --config_name="${MODEL_DIR}" \
6
+ --tokenizer_name="${MODEL_DIR}" \
7
+ --dataset_name="mc4" \
8
+ --dataset_config_name="bn" \
9
+ --do_train --do_eval \
10
+ --block_size="512" \
11
+ --per_device_train_batch_size="64" \
12
+ --per_device_eval_batch_size="64" \
13
+ --learning_rate="5e-3" --warmup_steps="1000" \
14
+ --adam_beta1="0.9" --adam_beta2="0.98" --weight_decay="0.01" \
15
+ --overwrite_output_dir \
16
+ --num_train_epochs="50" \
17
+ --logging_steps="500" \
18
+ --save_steps="2500" \
19
+ --eval_steps="2500" \
20
+ --preprocessing_num_workers="90" \
21
+ --push_to_hub
run_2.sh ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env bash
2
+ python run_clm_flax_v2.py \
3
+ --output_dir="${MODEL_DIR}" \
4
+ --model_type="gpt2" \
5
+ --config_name="${MODEL_DIR}" \
6
+ --tokenizer_name="${MODEL_DIR}" \
7
+ --dataset_name="mc4" \
8
+ --dataset_config_name="bn" \
9
+ --do_train --do_eval \
10
+ --block_size="512" \
11
+ --per_device_train_batch_size="64" \
12
+ --per_device_eval_batch_size="64" \
13
+ --learning_rate="5e-3" --warmup_steps="1000" \
14
+ --adam_beta1="0.9" --adam_beta2="0.98" --weight_decay="0.01" \
15
+ --overwrite_output_dir \
16
+ --max_steps="100000" \
17
+ --decay_steps="100000" \
18
+ --logging_steps="50" \
19
+ --save_steps="50" \
20
+ --eval_steps="50" \
21
+ --max_eval_samples 100 \
22
+ --push_to_hub
run_clm_flax_v2.py ADDED
@@ -0,0 +1,823 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python
2
+ # coding=utf-8
3
+ # Copyright 2021 The HuggingFace Team All rights reserved.
4
+ #
5
+ # Licensed under the Apache License, Version 2.0 (the "License");
6
+ # you may not use this file except in compliance with the License.
7
+ # You may obtain a copy of the License at
8
+ #
9
+ # http://www.apache.org/licenses/LICENSE-2.0
10
+ #
11
+ # Unless required by applicable law or agreed to in writing, software
12
+ # distributed under the License is distributed on an "AS IS" BASIS,
13
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
+ # See the License for the specific language governing permissions and
15
+ # limitations under the License.
16
+ """
17
+ Pre-training/Fine-tuning the library models for causal language modeling (GPT, GPT-2, CTRL, ...) on a text file or a dataset.
18
+ Here is the full list of checkpoints on the hub that can be fine-tuned by this script:
19
+ https://huggingface.co/models?filter=causal-lm
20
+ """
21
+ # You can also adapt this script on your own causal language modeling task. Pointers for this are left as comments.
22
+
23
+ from ast import Str
24
+ import logging
25
+ import math
26
+ import os
27
+ import sys
28
+ import time
29
+ from dataclasses import dataclass, field
30
+ from pathlib import Path
31
+ from typing import Callable, Optional
32
+ import json
33
+ import shutil
34
+ from collections import defaultdict
35
+ from flax import training
36
+ import numpy as np
37
+ import datasets
38
+ from datasets import Dataset, load_dataset
39
+ from tqdm import tqdm
40
+
41
+ import jax
42
+ import jax.profiler
43
+ import jax.numpy as jnp
44
+ import optax
45
+ import transformers
46
+ from flax import jax_utils, traverse_util
47
+ from flax.jax_utils import unreplicate
48
+ from flax.training import train_state
49
+ from flax.training.common_utils import get_metrics, onehot, shard, shard_prng_key
50
+ from flax.training.checkpoints import save_checkpoint, restore_checkpoint
51
+ from flax.serialization import to_bytes, from_bytes
52
+ from transformers import (
53
+ CONFIG_MAPPING,
54
+ FLAX_MODEL_FOR_CAUSAL_LM_MAPPING,
55
+ AutoConfig,
56
+ AutoTokenizer,
57
+ FlaxAutoModelForCausalLM,
58
+ HfArgumentParser,
59
+ TrainingArguments,
60
+ is_tensorboard_available,
61
+ )
62
+ from transformers.testing_utils import CaptureLogger
63
+
64
+ from importlib.util import find_spec
65
+ from utils import PrefetchDataloader, make_batch
66
+
67
+ logger = logging.getLogger(__name__)
68
+
69
+ MODEL_CONFIG_CLASSES = list(FLAX_MODEL_FOR_CAUSAL_LM_MAPPING.keys())
70
+ MODEL_TYPES = tuple(conf.model_type for conf in MODEL_CONFIG_CLASSES)
71
+
72
+
73
+ @dataclass
74
+ class ModelArguments:
75
+ """
76
+ Arguments pertaining to which model/config/tokenizer we are going to fine-tune, or train from scratch.
77
+ """
78
+
79
+ model_name_or_path: Optional[str] = field(
80
+ default=None,
81
+ metadata={
82
+ "help": "The model checkpoint for weights initialization."
83
+ "Don't set if you want to train a model from scratch."
84
+ },
85
+ )
86
+ model_type: Optional[str] = field(
87
+ default=None,
88
+ metadata={"help": "If training from scratch, pass a model type from the list: " + ", ".join(MODEL_TYPES)},
89
+ )
90
+ config_name: Optional[str] = field(
91
+ default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"}
92
+ )
93
+ tokenizer_name: Optional[str] = field(
94
+ default=None, metadata={"help": "Pretrained tokenizer name or path if not the same as model_name"}
95
+ )
96
+ cache_dir: Optional[str] = field(
97
+ default=None, metadata={"help": "Where do you want to store the pretrained models downloaded from s3"}
98
+ )
99
+ use_fast_tokenizer: bool = field(
100
+ default=True,
101
+ metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."},
102
+ )
103
+ dtype: Optional[str] = field(
104
+ default="float32",
105
+ metadata={
106
+ "help": "Floating-point format in which the model weights should be initialized and trained. Choose one of `[float32, float16, bfloat16]`."
107
+ },
108
+ )
109
+ save_optimizer: Optional[bool] = field(
110
+ default=True,
111
+ metadata={"help": "Whether to store full train state including optimizer."},
112
+ )
113
+ repo_path_or_name: Optional[str] = field(
114
+ default=None,
115
+ metadata={"help": "Path to the modelhub repo directory"},
116
+ )
117
+ repo_url: Optional[str] = field(
118
+ default=None,
119
+ metadata={"help": "URL of the modelhub repo"},
120
+ )
121
+ decay_steps: int = field(default=None, metadata={"help":"Number of steps from peak to final learning rate"})
122
+
123
+ @dataclass
124
+ class DataTrainingArguments:
125
+ """
126
+ Arguments pertaining to what data we are going to input our model for training and eval.
127
+ """
128
+
129
+ dataset_name: Optional[str] = field(
130
+ default=None, metadata={"help": "The name of the dataset to use (via the datasets library)."}
131
+ )
132
+ dataset_config_name: Optional[str] = field(
133
+ default=None, metadata={"help": "The configuration name of the dataset to use (via the datasets library)."}
134
+ )
135
+ train_file: Optional[str] = field(default=None, metadata={"help": "The input training data file (a text file)."})
136
+ validation_file: Optional[str] = field(
137
+ default=None,
138
+ metadata={"help": "An optional input evaluation data file to evaluate the perplexity on (a text file)."},
139
+ )
140
+ data_dir: Optional[str] = field(default=None, metadata={"help": "Path to data directory."})
141
+ max_train_samples: Optional[int] = field(
142
+ default=None,
143
+ metadata={
144
+ "help": "For debugging purposes or quicker training, truncate the number of training examples to this "
145
+ "value if set."
146
+ },
147
+ )
148
+ max_eval_samples: Optional[int] = field(
149
+ default=None,
150
+ metadata={
151
+ "help": "For debugging purposes or quicker training, truncate the number of evaluation examples to this "
152
+ "value if set."
153
+ },
154
+ )
155
+ overwrite_cache: bool = field(
156
+ default=False, metadata={"help": "Overwrite the cached training and evaluation sets"}
157
+ )
158
+ validation_split_percentage: Optional[int] = field(
159
+ default=5,
160
+ metadata={
161
+ "help": "The percentage of the train set used as validation set in case there's no validation split"
162
+ },
163
+ )
164
+ block_size: Optional[int] = field(
165
+ default=None,
166
+ metadata={
167
+ "help": "Optional input sequence length after tokenization. "
168
+ "The training dataset will be truncated in block of this size for training. "
169
+ "Default to the model max input length for single sentence inputs (take into account special tokens)."
170
+ },
171
+ )
172
+ overwrite_cache: bool = field(
173
+ default=False, metadata={"help": "Overwrite the cached training and evaluation sets"}
174
+ )
175
+ preprocessing_num_workers: Optional[int] = field(
176
+ default=None,
177
+ metadata={"help": "The number of processes to use for the preprocessing."},
178
+ )
179
+ text_column_name: Optional[str] = field(
180
+ default='text',
181
+ metadata={"help": "Column containing main text data."},
182
+ )
183
+ shuffle_buffer_size: int = field(
184
+ default=10000, metadata={"help": "The number of examples to pre-load for shuffling."}
185
+ )
186
+ num_train_steps: int = field(default=50000, metadata={"help": "The number of training steps."})
187
+ num_eval_samples: int = field(default=50000, metadata={"help": "The number of samples to be used for evaluation"})
188
+ prefetch_buffer: int = field(default=8, metadata={"help": "The number of batches to prefetch for loading"})
189
+
190
+ def __post_init__(self):
191
+ if self.dataset_name is None and self.train_file is None and self.validation_file is None:
192
+ raise ValueError("Need either a dataset name or a training/validation file.")
193
+ else:
194
+ if self.train_file is not None:
195
+ extension = self.train_file.split(".")[-1]
196
+ assert extension in ["csv", "json", "txt"], "`train_file` should be a csv, a json or a txt file."
197
+ if self.validation_file is not None:
198
+ extension = self.validation_file.split(".")[-1]
199
+ assert extension in ["csv", "json", "txt"], "`validation_file` should be a csv, a json or a txt file."
200
+
201
+
202
+ class TrainState(train_state.TrainState):
203
+ dropout_rng: jnp.ndarray
204
+
205
+ def replicate(self):
206
+ return jax_utils.replicate(self).replace(dropout_rng=shard_prng_key(self.dropout_rng))
207
+
208
+ # the below functions are not used now, probably to be removed
209
+ def generate_batch_splits(samples_idx: jnp.ndarray, batch_size: int) -> jnp.ndarray:
210
+ num_samples = len(samples_idx)
211
+ samples_to_remove = num_samples % batch_size
212
+
213
+ if samples_to_remove != 0:
214
+ samples_idx = samples_idx[:-samples_to_remove]
215
+ sections_split = num_samples // batch_size
216
+ batch_idx = np.split(samples_idx, sections_split)
217
+ return batch_idx
218
+
219
+
220
+ def advance_iter_and_group_samples(train_iterator, num_samples, max_seq_length):
221
+ """
222
+ The training iterator is advanced so that after groupifying the samples,
223
+ `num_samples` of length `max_seq_length` are returned.
224
+ """
225
+ num_total_tokens = max_seq_length * num_samples
226
+ samples = defaultdict(list)
227
+
228
+ i = 0
229
+ while i < num_total_tokens:
230
+ tokenized_samples = next(train_iterator)
231
+ i += len(tokenized_samples["input_ids"])
232
+
233
+ # concatenate tokenized samples to list
234
+ samples = {k: samples[k] + tokenized_samples[k] for k in tokenized_samples.keys()}
235
+
236
+ # Concatenated tokens are split to lists of length `max_seq_length`.
237
+ # Note that remainedr of % max_seq_length are thrown away.
238
+ def group_texts(examples):
239
+ result = {
240
+ k: [t[i : i + max_seq_length] for i in range(0, num_total_tokens, max_seq_length)]
241
+ for k, t in examples.items()
242
+ }
243
+ return result
244
+
245
+ grouped_samples = group_texts(samples)
246
+ return grouped_samples
247
+
248
+ def data_loader(rng: jax.random.PRNGKey, dataset: Dataset, batch_size: int, shuffle: bool = False):
249
+ """
250
+ Returns batches of size `batch_size` from truncated `dataset`, sharded over all local devices.
251
+ Shuffle batches if `shuffle` is `True`.
252
+ """
253
+ steps_per_epoch = len(dataset) // batch_size
254
+
255
+ if shuffle:
256
+ batch_idx = jax.random.permutation(rng, len(dataset))
257
+ else:
258
+ batch_idx = jnp.arange(len(dataset))
259
+
260
+ batch_idx = batch_idx[: steps_per_epoch * batch_size] # Skip incomplete batch.
261
+ batch_idx = batch_idx.reshape((steps_per_epoch, batch_size))
262
+
263
+ for idx in batch_idx:
264
+ batch = dataset[idx]
265
+ batch = {k: jnp.array(v) for k, v in batch.items()}
266
+
267
+ batch = shard(batch)
268
+
269
+ yield batch
270
+
271
+
272
+ def write_train_metric(summary_writer, train_metrics, train_time, step):
273
+ summary_writer.scalar("train_time", train_time, step)
274
+
275
+ train_metrics = get_metrics(train_metrics)
276
+ for key, vals in train_metrics.items():
277
+ tag = f"train_{key}"
278
+ for i, val in enumerate(vals):
279
+ summary_writer.scalar(tag, val, step - len(vals) + i + 1)
280
+
281
+
282
+ def write_eval_metric(summary_writer, eval_metrics, step):
283
+ for metric_name, value in eval_metrics.items():
284
+ summary_writer.scalar(f"eval_{metric_name}", value, step)
285
+
286
+
287
+ def create_learning_rate_fn(
288
+ num_train_steps: int, train_batch_size: int, num_warmup_steps: int, learning_rate: float
289
+ ) -> Callable[[int], jnp.array]:
290
+ """Returns a linear warmup, linear_decay learning rate function."""
291
+ warmup_fn = optax.linear_schedule(init_value=0.0, end_value=learning_rate, transition_steps=num_warmup_steps)
292
+ decay_fn = optax.linear_schedule(
293
+ init_value=learning_rate, end_value=0, transition_steps=num_train_steps - num_warmup_steps
294
+ )
295
+ schedule_fn = optax.join_schedules(schedules=[warmup_fn, decay_fn], boundaries=[num_warmup_steps])
296
+ return schedule_fn
297
+ def gpt3_schedule(warmup_steps,
298
+ total_steps,
299
+ peak_lr,
300
+ end_lr):
301
+ def sch(step):
302
+ warmup_pct = jnp.clip(step, 0, warmup_steps) / warmup_steps
303
+ anneal_pct = jnp.clip(step - warmup_steps, 0, total_steps) / total_steps
304
+
305
+ return warmup_pct * peak_lr - (peak_lr - end_lr) * (1 - jnp.cos(jnp.pi * anneal_pct)) / 2
306
+
307
+ return sch
308
+
309
+ # utils
310
+ def mb_item(x):
311
+ return x.item() if hasattr(x, "item") else x
312
+
313
+ #checkpoint functions
314
+ def save_model_checkpoint(model, save_dir, state, with_opt=True, push_to_hub=False):
315
+ """
316
+ If `push_to_hub` is True, will save to `save_dir`. Otherwise will save to `save_dir/ckpt-{step}`.
317
+ """
318
+ state = jax_utils.unreplicate(state)
319
+ logger.info(f"SAVING CHECKPOINT IN {save_dir}...")
320
+ if not push_to_hub:
321
+ save_dir = f"{save_dir}/ckpt-{mb_item(state.step)-1}"
322
+ model.save_pretrained(
323
+ save_dir,
324
+ params=state.params,
325
+ push_to_hub=push_to_hub,
326
+ commit_message=f"Saving weights and logs at step {mb_item(state.step)-1}",
327
+ )
328
+ if with_opt:
329
+ with open(os.path.join(save_dir, "opt_state.msgpack"), "wb") as f:
330
+ f.write(to_bytes(state.opt_state))
331
+ with open(os.path.join(save_dir, "training_state.json"), "w") as f:
332
+ json.dump({"step": state.step.item()}, f)
333
+ logger.info("checkpoint saved")
334
+
335
+ def restore_model_checkpoint(save_dir, state):
336
+ logger.info(f"RESTORING CHECKPOINT FROM {save_dir}...")
337
+ with open(os.path.join(save_dir, "flax_model.msgpack"), "rb") as f:
338
+ params = from_bytes(state.params, f.read())
339
+
340
+ with open(os.path.join(save_dir, "opt_state.msgpack"), "rb") as f:
341
+ opt_state = from_bytes(state.opt_state, f.read())
342
+
343
+ with open(os.path.join(save_dir, "training_state.json"), "r") as f:
344
+ training_state = json.load(f)
345
+ step = training_state["step"]
346
+
347
+ logger.info("checkpoint restored")
348
+ return state.replace(step=step, params=params, opt_state=opt_state), step
349
+
350
+ def rotate_checkpoints(ckpt_dir:str, save_total_limit:int):
351
+ "Removes older checkpoints so that `save_total_limit` checkpoints are kept"
352
+ # TODO: what to remove is decided using step number only, we might want to improve that
353
+ ckpts = [str(x) for x in Path(ckpt_dir).glob("ckpt-*")]
354
+ # sort checkpoints by step
355
+ ckpts_sorted = sorted(ckpts, key=lambda x: int(x.split('-')[-1]))
356
+ ckpts_to_delete = ckpts_sorted[:-save_total_limit]
357
+ for ckpt in ckpts_to_delete:
358
+ logger.info(f"Deleting older checkpoint [{ckpt}] due to save_total_limit ({save_total_limit})")
359
+ shutil.rmtree(ckpt)
360
+
361
+ def main():
362
+ # See all possible arguments in src/transformers/training_args.py
363
+ # or by passing the --help flag to this script.
364
+ # We now keep distinct sets of args, for a cleaner separation of concerns.
365
+
366
+ parser = HfArgumentParser((ModelArguments, DataTrainingArguments, TrainingArguments))
367
+ if len(sys.argv) == 2 and sys.argv[1].endswith(".json"):
368
+ # If we pass only one argument to the script and it's the path to a json file,
369
+ # let's parse it to get our arguments.
370
+ model_args, data_args, training_args = parser.parse_json_file(json_file=os.path.abspath(sys.argv[1]))
371
+ else:
372
+ model_args, data_args, training_args = parser.parse_args_into_dataclasses()
373
+
374
+ if (
375
+ os.path.exists(training_args.output_dir)
376
+ and os.listdir(training_args.output_dir)
377
+ and training_args.do_train
378
+ and not training_args.overwrite_output_dir
379
+ ):
380
+ raise ValueError(
381
+ f"Output directory ({training_args.output_dir}) already exists and is not empty."
382
+ "Use --overwrite_output_dir to overcome."
383
+ )
384
+
385
+ # Make one log on every process with the configuration for debugging.
386
+ logging.basicConfig(
387
+ format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
388
+ datefmt="%m/%d/%Y %H:%M:%S",
389
+ level=logging.INFO,
390
+ )
391
+ # Setup logging, we only want one process per machine to log things on the screen.
392
+ logger.setLevel(logging.INFO if jax.process_index() == 0 else logging.ERROR)
393
+ if jax.process_index() == 0:
394
+ datasets.utils.logging.set_verbosity_warning()
395
+ transformers.utils.logging.set_verbosity_info()
396
+ else:
397
+ datasets.utils.logging.set_verbosity_error()
398
+ transformers.utils.logging.set_verbosity_error()
399
+
400
+ # Set the verbosity to info of the Transformers logger (on main process only):
401
+ logger.info(f"Training/evaluation parameters {training_args}")
402
+
403
+ # Get the datasets: you can either provide your own CSV/JSON/TXT training and evaluation files (see below)
404
+ # or just provide the name of one of the public datasets available on the hub at https://huggingface.co/datasets/
405
+ # (the dataset will be downloaded automatically from the datasets Hub).
406
+ #
407
+ # For CSV/JSON files, this script will use the column called 'text' or the first column if no column called
408
+ # 'text' is found. You can easily tweak this behavior (see below).
409
+ #
410
+ # In distributed training, the load_dataset function guarantees that only one local process can concurrently
411
+ # download the dataset.
412
+ if data_args.dataset_name is not None:
413
+ # Downloading and loading a dataset from the hub.
414
+ train_dataset = load_dataset(
415
+ data_args.dataset_name,
416
+ data_args.dataset_config_name,
417
+ cache_dir=model_args.cache_dir,
418
+ streaming=True,
419
+ split="train"
420
+ )
421
+ eval_dataset = load_dataset(
422
+ data_args.dataset_name,
423
+ data_args.dataset_config_name,
424
+ cache_dir=model_args.cache_dir,
425
+ streaming=True,
426
+ split="validation"
427
+ )
428
+
429
+ # See more about loading any type of standard or custom dataset (from files, python dict, pandas DataFrame, etc) at
430
+ # https://huggingface.co/docs/datasets/loading_datasets.html.
431
+
432
+ # Load pretrained model and tokenizer
433
+
434
+ # Distributed training:
435
+ # The .from_pretrained methods guarantee that only one local process can concurrently
436
+ # download model & vocab.
437
+ if model_args.config_name:
438
+ config = AutoConfig.from_pretrained(model_args.config_name, cache_dir=model_args.cache_dir)
439
+ elif model_args.model_name_or_path:
440
+ config = AutoConfig.from_pretrained(model_args.model_name_or_path, cache_dir=model_args.cache_dir)
441
+ else:
442
+ config = CONFIG_MAPPING[model_args.model_type]()
443
+ logger.warning("You are instantiating a new config instance from scratch.")
444
+
445
+ if model_args.tokenizer_name:
446
+ tokenizer = AutoTokenizer.from_pretrained(
447
+ model_args.tokenizer_name, cache_dir=model_args.cache_dir, use_fast=model_args.use_fast_tokenizer
448
+ )
449
+ elif model_args.model_name_or_path:
450
+ tokenizer = AutoTokenizer.from_pretrained(
451
+ model_args.model_name_or_path, cache_dir=model_args.cache_dir, use_fast=model_args.use_fast_tokenizer
452
+ )
453
+ else:
454
+ raise ValueError(
455
+ "You are instantiating a new tokenizer from scratch. This is not supported by this script."
456
+ "You can do it from another script, save it, and load it from here, using --tokenizer_name."
457
+ )
458
+
459
+ if model_args.model_name_or_path:
460
+ model = FlaxAutoModelForCausalLM.from_pretrained(
461
+ model_args.model_name_or_path, config=config, seed=training_args.seed, dtype=getattr(jnp, model_args.dtype)
462
+ )
463
+ else:
464
+ model = FlaxAutoModelForCausalLM.from_config(
465
+ config, seed=training_args.seed, dtype=getattr(jnp, model_args.dtype)
466
+ )
467
+
468
+ # Preprocessing the datasets.
469
+ # First we tokenize all the texts.
470
+ # column_names = eval_dataset.column_names
471
+ text_column_name = data_args.text_column_name # if data_args.text_column_name in column_names else column_names[0]
472
+
473
+ # since this will be pickled to avoid _LazyModule error in Hasher force logger loading before tokenize_function
474
+ tok_logger = transformers.utils.logging.get_logger("transformers.tokenization_utils_base")
475
+
476
+ def tokenize_function(examples):
477
+ with CaptureLogger(tok_logger) as cl:
478
+ output = tokenizer(examples[text_column_name])
479
+ # clm input could be much much longer than block_size
480
+ if "Token indices sequence length is longer than the" in cl.out:
481
+ tok_logger.warning(
482
+ "^^^^^^^^^^^^^^^^ Please ignore the warning above - this long input will be chunked into smaller bits before being passed to the model."
483
+ )
484
+ return output
485
+
486
+ tokenized_dataset = train_dataset.map(
487
+ tokenize_function,
488
+ batched=True,
489
+ )
490
+ tokenized_eval_dataset = eval_dataset.map(
491
+ tokenize_function,
492
+ batched=True,
493
+ # remove_columns=column_names,
494
+ # num_proc=data_args.preprocessing_num_workers,
495
+ # load_from_cache_file=not data_args.overwrite_cache,
496
+ )
497
+
498
+ if data_args.block_size is None:
499
+ block_size = tokenizer.model_max_length
500
+ if block_size > config.max_position_embeddings:
501
+ logger.warning(
502
+ f"The tokenizer picked seems to have a very large `model_max_length` ({tokenizer.model_max_length}). "
503
+ "Picking 1024 instead. You can change that default value by passing --block_size xxx."
504
+ )
505
+ block_size = 1024
506
+ else:
507
+ if data_args.block_size > tokenizer.model_max_length:
508
+ logger.warning(
509
+ f"The block_size passed ({data_args.block_size}) is larger than the maximum length for the model"
510
+ f"({tokenizer.model_max_length}). Using block_size={tokenizer.model_max_length}."
511
+ )
512
+ block_size = min(data_args.block_size, tokenizer.model_max_length)
513
+
514
+ # # Main data processing function that will concatenate all texts from our dataset and generate chunks of block_size.
515
+ def group_texts(examples):
516
+ # Concatenate all texts.
517
+ concatenated_examples = {k: sum(examples[k], []) for k in examples.keys()}
518
+ total_length = len(concatenated_examples[list(examples.keys())[0]])
519
+ # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can
520
+ # customize this part to your needs.
521
+ total_length = (total_length // block_size) * block_size
522
+ # Split by chunks of max_len.
523
+ result = {
524
+ k: [t[i : i + block_size] for i in range(0, total_length, block_size)]
525
+ for k, t in concatenated_examples.items()
526
+ }
527
+ result["labels"] = result["input_ids"].copy()
528
+ return result
529
+
530
+ # Note that with `batched=True`, this map processes 1,000 texts together, so group_texts throws away a remainder
531
+ # for each of those groups of 1,000 texts. You can adjust that batch_size here but a higher value might be slower
532
+ # to preprocess.
533
+ #
534
+ # To speed up this part, we use multiprocessing. See the documentation of the map method for more information:
535
+ # https://huggingface.co/docs/datasets/package_reference/main_classes.html#datasets.Dataset.map
536
+
537
+ shuffle_seed = training_args.seed
538
+ # if training_args.do_train:
539
+ # if "train" not in tokenized_dataset:
540
+ # raise ValueError("--do_train requires a train dataset")
541
+ # train_dataset = tokenized_dataset
542
+ # if data_args.max_train_samples is not None:
543
+ # train_dataset = train_dataset.take(range(data_args.max_train_samples))
544
+ # train_dataset = train_dataset.shuffle(buffer_size=data_args.shuffle_buffer_size, seed=shuffle_seed)
545
+ # train_iter = iter(train_dataset)
546
+
547
+
548
+ train_loader = PrefetchDataloader(
549
+ tokenized_dataset,
550
+ training_args.max_steps * training_args.gradient_accumulation_steps,
551
+ int(training_args.per_device_train_batch_size) * jax.device_count(),
552
+ block_size,
553
+ prefetch_buffer=data_args.prefetch_buffer,
554
+ seed=shuffle_seed
555
+ )
556
+ # evaluation data is not in streaming mode
557
+ # if training_args.do_eval:
558
+ # eval_dataset = tokenized_eval_dataset.map(
559
+ # group_texts,
560
+ # batched=True,
561
+ # num_proc=data_args.preprocessing_num_workers,
562
+ # load_from_cache_file=not data_args.overwrite_cache,
563
+ # )
564
+ # if data_args.max_eval_samples is not None:
565
+ # eval_dataset = eval_dataset.select(range(data_args.max_eval_samples))
566
+
567
+ # Enable tensorboard only on the master node
568
+ has_tensorboard = is_tensorboard_available()
569
+ if has_tensorboard and jax.process_index() == 0:
570
+ try:
571
+ from flax.metrics.tensorboard import SummaryWriter
572
+
573
+ summary_writer = SummaryWriter(log_dir=Path(training_args.output_dir))
574
+ except ImportError as ie:
575
+ has_tensorboard = False
576
+ logger.warning(
577
+ f"Unable to display metrics through TensorBoard because some package are not installed: {ie}"
578
+ )
579
+ else:
580
+ logger.warning(
581
+ "Unable to display metrics through TensorBoard because the package is not installed: "
582
+ "Please run pip install tensorboard to enable."
583
+ )
584
+
585
+ # enable wandb tracking
586
+ has_wandb = find_spec("wandb") is not None
587
+ if jax.process_index() == 0 and has_wandb and ("wandb" in training_args.report_to):
588
+ try:
589
+ import wandb
590
+ wandb.init(
591
+ name=training_args.run_name,
592
+ entity="wandb",
593
+ project="hf-flax-gpt-neo-copilot",
594
+ sync_tensorboard=True
595
+ )
596
+ wandb.config.update(training_args)
597
+ wandb.config.update(model_args)
598
+ wandb.config.update(data_args)
599
+ except ImportError as e:
600
+ print(e)
601
+ has_wandb = False
602
+
603
+
604
+ # Initialize our training
605
+ rng = jax.random.PRNGKey(training_args.seed)
606
+ rng, dropout_rng = jax.random.split(rng)
607
+
608
+ # Store some constant
609
+ num_epochs = int(training_args.num_train_epochs)
610
+ train_batch_size = int(training_args.per_device_train_batch_size) * jax.device_count() * training_args.gradient_accumulation_steps
611
+ eval_batch_size = int(training_args.per_device_eval_batch_size) * jax.device_count()
612
+ total_train_steps = training_args.max_steps * training_args.gradient_accumulation_steps
613
+
614
+ # Create learning rate schedule
615
+ gpt3_schedule_fn = gpt3_schedule(
616
+ training_args.warmup_steps,
617
+ model_args.decay_steps,
618
+ training_args.learning_rate,
619
+ training_args.learning_rate / 10.
620
+ )
621
+
622
+ # We use Optax's "masking" functionality to not apply weight decay
623
+ # to bias and LayerNorm scale parameters. decay_mask_fn returns a
624
+ # mask boolean with the same structure as the parameters.
625
+ # The mask is True for parameters that should be decayed.
626
+ # Note that this mask is specifically adapted for FlaxGPT2.
627
+ # For other models, one should correct the layer norm parameter naming
628
+ # accordingly.
629
+ def decay_mask_fn(params):
630
+ flat_params = traverse_util.flatten_dict(params)
631
+ flat_mask = {
632
+ path: (path[-1] != "bias" and path[-2:] not in [("ln_1", "scale"), ("ln_2", "scale"), ("ln_f", "scale")])
633
+ for path in flat_params
634
+ }
635
+ return traverse_util.unflatten_dict(flat_mask)
636
+
637
+ # create optimizer
638
+ if training_args.adafactor:
639
+ # We use the default parameters here to initialize adafactor,
640
+ # For more details about the parameters please check https://github.com/deepmind/optax/blob/ed02befef9bf81cbbf236be3d2b0e032e9ed4a40/optax/_src/alias.py#L74
641
+ optimizer = optax.adafactor(
642
+ learning_rate=gpt3_schedule_fn,
643
+ )
644
+ else:
645
+ optimizer = optax.adamw(
646
+ learning_rate=gpt3_schedule_fn,
647
+ b1=training_args.adam_beta1,
648
+ b2=training_args.adam_beta2,
649
+ eps=training_args.adam_epsilon,
650
+ weight_decay=training_args.weight_decay,
651
+ mask=decay_mask_fn,
652
+ )
653
+ if training_args.gradient_accumulation_steps > 1:
654
+ optimizer = optax.MultiSteps(optimizer, training_args.gradient_accumulation_steps)
655
+ grad_accum_steps = training_args.gradient_accumulation_steps
656
+
657
+ # Setup train state
658
+ state = TrainState.create(apply_fn=model.__call__, params=model.params, tx=optimizer, dropout_rng=dropout_rng)
659
+
660
+ if training_args.resume_from_checkpoint:
661
+ state = restore_checkpoint(training_args.resume_from_checkpoint, state)
662
+ resume_step = mb_item(state.step)
663
+ else:
664
+ resume_step = 0
665
+
666
+ def loss_fn(logits, labels):
667
+ shift_logits = logits[..., :-1, :]
668
+ shift_labels = labels[..., 1:]
669
+ loss = optax.softmax_cross_entropy(shift_logits, onehot(shift_labels, shift_logits.shape[-1]))
670
+ return loss.mean()
671
+
672
+ # Define gradient update step fn
673
+ def train_step(state, batch):
674
+ dropout_rng, new_dropout_rng = jax.random.split(state.dropout_rng)
675
+
676
+ def compute_loss(params):
677
+ labels = batch.pop("labels")
678
+ logits = state.apply_fn(**batch, params=params, dropout_rng=dropout_rng, train=True)[0]
679
+ loss = loss_fn(logits, labels)
680
+ return loss
681
+
682
+ grad_fn = jax.value_and_grad(compute_loss)
683
+ loss, grad = grad_fn(state.params)
684
+ grad = jax.lax.pmean(grad, "batch")
685
+
686
+ new_state = state.apply_gradients(grads=grad, dropout_rng=new_dropout_rng)
687
+
688
+ metrics = {"loss": loss, "learning_rate": gpt3_schedule_fn(state.step // grad_accum_steps)}
689
+ metrics = jax.lax.pmean(metrics, axis_name="batch")
690
+
691
+ return new_state, metrics
692
+
693
+ # Define eval fn
694
+ def eval_step(params, batch):
695
+ labels = batch.pop("labels")
696
+ logits = model(**batch, params=params, train=False)[0]
697
+ loss = loss_fn(logits, labels)
698
+
699
+ # summarize metrics
700
+ metrics = {"loss": loss}
701
+ metrics = jax.lax.pmean(metrics, axis_name="batch")
702
+ return metrics
703
+
704
+ # Create parallel version of the train and eval step
705
+ p_train_step = jax.pmap(train_step, "batch", donate_argnums=(0,))
706
+ p_eval_step = jax.pmap(eval_step, "batch")
707
+
708
+ # Replicate the train state on each device
709
+ state = state.replicate()
710
+
711
+ logger.info("***** Running training *****")
712
+ logger.info(f" Instantaneous batch size per device = {training_args.per_device_train_batch_size}")
713
+ logger.info(f" Total train batch size (w. parallel, distributed and grad_accum) = {train_batch_size}")
714
+ logger.info(f" Total optimization steps = {training_args.max_steps}")
715
+
716
+ if not training_args.skip_memory_metrics:
717
+ server = jax.profiler.start_server(9999)
718
+
719
+ train_time = 0
720
+ train_metrics = []
721
+ # TODO: figure out training duration
722
+ steps = tqdm(range(training_args.max_steps), position=0, initial=resume_step)
723
+ for step in range(total_train_steps):
724
+ # ======================== Training ================================
725
+ train_start = time.time()
726
+ rng, input_rng = jax.random.split(rng)
727
+
728
+ cur_step = step
729
+ # skip to the step from which we are resuming
730
+ if cur_step < resume_step:
731
+ continue
732
+
733
+ # using advance_iter_and_group_samples seem to make training slower
734
+ # samples = advance_iter_and_group_samples(iter(tokenized_dataset), int(training_args.per_device_train_batch_size) * jax.device_count(), block_size)
735
+ # batch = shard(make_batch(samples))
736
+ batch = shard(next(train_loader))
737
+ # logger.info(f"{batch['input_ids'].shape}")
738
+ state, train_metric = p_train_step(state, batch)
739
+ train_metrics.append(train_metric)
740
+ if step % grad_accum_steps == 0:
741
+ steps.update(1)
742
+
743
+ if cur_step % (training_args.logging_steps * grad_accum_steps)== 0 and cur_step > 0:
744
+ # Save metrics
745
+ train_metric = unreplicate(train_metric)
746
+ train_time += time.time() - train_start
747
+ if has_tensorboard and jax.process_index() == 0:
748
+ write_train_metric(summary_writer, train_metrics, train_time, cur_step)
749
+ if has_wandb and jax.process_index() == 0 and ("wandb" in training_args.report_to):
750
+ # TODO: add accumulation of metrics
751
+ _metrics = {k if k=="learning_rate" else f"train_{k}":mb_item(v.mean()) for k, v in train_metric.items()}
752
+ wandb.log({"training_step":cur_step, **_metrics}, commit=True)
753
+
754
+ steps.write(
755
+ f"Step... ({cur_step} | Loss: {train_metric['loss'].mean()}, Learning Rate: {train_metric['learning_rate'].mean()})"
756
+ )
757
+
758
+ train_metrics = []
759
+
760
+ if cur_step % (training_args.eval_steps * grad_accum_steps) == 0 and cur_step > 0 and training_args.do_eval:
761
+ # ======================== Evaluating ==============================
762
+ eval_metrics = []
763
+ eval_steps = data_args.max_eval_samples # len(eval_dataset) // eval_batch_size
764
+ # eval_loader = data_loader(input_rng, eval_dataset, eval_batch_size)
765
+ eval_loader = PrefetchDataloader(
766
+ tokenized_eval_dataset,
767
+ eval_steps,
768
+ eval_batch_size,
769
+ block_size,
770
+ prefetch_buffer=data_args.prefetch_buffer,
771
+ shuffle=False,
772
+ )
773
+ for _ in tqdm(range(eval_steps), desc="Evaluating...", position=2, leave=False):
774
+ # Model forward
775
+ batch = shard(next(eval_loader))
776
+ metrics = p_eval_step(state.params, batch)
777
+ eval_metrics.append(metrics)
778
+
779
+ # normalize eval metrics
780
+ eval_metrics = get_metrics(eval_metrics)
781
+ eval_metrics = jax.tree_map(jnp.mean, eval_metrics)
782
+
783
+ try:
784
+ eval_metrics["perplexity"] = math.exp(eval_metrics["loss"])
785
+ except OverflowError:
786
+ eval_metrics["perplexity"] = float("inf")
787
+ # TODO: this needs to be closed properly
788
+ eval_loader.terminate()
789
+ # Print metrics and update progress bar
790
+ desc = f"Step... ({cur_step} | Eval Loss: {eval_metrics['loss']} | Eval Perplexity: {eval_metrics['perplexity']})"
791
+ steps.write(desc)
792
+ steps.desc = desc
793
+
794
+ # Save metrics
795
+ if has_tensorboard and jax.process_index() == 0:
796
+ # cur_step = epoch * (len(train_dataset) // train_batch_size)
797
+ write_eval_metric(summary_writer, eval_metrics, cur_step)
798
+ if has_wandb and jax.process_index() == 0 and ("wandb" in training_args.report_to):
799
+ _metrics = {f"eval_{k}":mb_item(v) for k, v in eval_metrics.items()}
800
+ wandb.log({"eval_step":cur_step, **_metrics})
801
+
802
+ if cur_step % (training_args.save_steps * grad_accum_steps) == 0 and cur_step > 0:
803
+ # save checkpoint after each epoch and push checkpoint to the hub
804
+ if jax.process_index() == 0:
805
+ print("*********", training_args.push_to_hub)
806
+ save_model_checkpoint(model, training_args.output_dir, state, with_opt=False,
807
+ push_to_hub=training_args.push_to_hub)
808
+ if model_args.save_optimizer:
809
+ # this saves full state including optimizer
810
+ save_checkpoint(training_args.output_dir, jax_utils.unreplicate(state), cur_step, keep=training_args.save_total_limit, overwrite=False)
811
+ if training_args.save_total_limit is not None:
812
+ rotate_checkpoints(training_args.output_dir, training_args.save_total_limit)
813
+
814
+ train_loader.terminate()
815
+ # save model after training is over
816
+ save_model_checkpoint(model, training_args.output_dir, state, with_opt=False,
817
+ push_to_hub=training_args.push_to_hub)
818
+
819
+
820
+
821
+
822
+ if __name__ == "__main__":
823
+ main()
utils.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import numpy as np
2
+ import threading
3
+ import queue
4
+ import multiprocessing
5
+ from collections import defaultdict
6
+ import jax
7
+ import jax.numpy as jnp
8
+
9
+
10
+
11
+ def make_batch(samples):
12
+ batch = {k:jnp.array(v) for k,v in samples.items()}
13
+ batch['labels'] = batch['input_ids'].copy()
14
+ return batch
15
+
16
+ class PrefetchDataloaderTread(threading.Thread):
17
+ "Prefetch dataloader for IterableDataset"
18
+ def __init__(self, dataset, max_steps, batch_size, sequence_length, prefetch_buffer=1, shuffle=True, shuffle_buffer=1000, seed=0):
19
+ super().__init__(daemon=True)
20
+ self.max_steps = max_steps
21
+ self.bs = batch_size
22
+ self.seq_len = sequence_length
23
+ self.max_length = batch_size * sequence_length
24
+ self.prefetch_buffer = prefetch_buffer
25
+ self.shuffle = shuffle
26
+ self.shuffle_buffer = shuffle_buffer
27
+ self.seed = seed
28
+ self.dataset = dataset
29
+ if shuffle:
30
+ shuffled_dataset = dataset.shuffle(shuffle_buffer, seed=self.seed)
31
+ self.seed += 1
32
+ self.ds_iter = iter(shuffled_dataset)
33
+ else:
34
+ self.ds_iter = iter(dataset)
35
+ self.queue = queue.Queue(prefetch_buffer)
36
+ self.rem = defaultdict(list)
37
+ self.start()
38
+
39
+ def __next__(self):
40
+ batch = self.queue.get()
41
+ return batch
42
+
43
+ def run(self):
44
+ i = 0
45
+ while True and i < self.max_steps:
46
+ i += 1
47
+ # prepair next batch
48
+ sample = self.rem.copy()
49
+ l = len(sample["input_ids"])
50
+ max_length = self.max_length
51
+ while l < max_length:
52
+ next_sample = next(self.ds_iter)
53
+ l += len(next_sample["input_ids"])
54
+ sample = {k:sample[k]+next_sample[k] for k in next_sample.keys()}
55
+
56
+ self.rem = {k:v[max_length:] for k,v in sample.items()}
57
+ sample = {k:v[:max_length] for k,v in sample.items()}
58
+ # regroup to shape [bs x seq_len]
59
+ samples = {k:np.array([v[i*self.seq_len:(i+1)*self.seq_len] for i in range(self.bs)]) for k,v in sample.items()}
60
+
61
+ self.queue.put(make_batch(samples))
62
+ self.queue.put(None)
63
+
64
+ def __iter__(self):
65
+ return self
66
+
67
+
68
+ class PrefetchDataloader(multiprocessing.Process):
69
+ "Prefetch dataloader for IterableDataset"
70
+ def __init__(self, dataset, max_steps, batch_size, sequence_length, prefetch_buffer=1, shuffle=True, shuffle_buffer=1000, seed=0):
71
+ super().__init__(daemon=True)
72
+ self.max_steps = max_steps
73
+ self.bs = batch_size
74
+ self.seq_len = sequence_length
75
+ self.max_length = batch_size * sequence_length
76
+ self.prefetch_buffer = prefetch_buffer
77
+ self.shuffle = shuffle
78
+ self.shuffle_buffer = shuffle_buffer
79
+ self.seed = seed
80
+ self.dataset = dataset
81
+ self.make_iter()
82
+ self.queue = multiprocessing.Queue(prefetch_buffer)
83
+ self.rem = defaultdict(list)
84
+ self.start()
85
+
86
+ def make_iter(self):
87
+ if self.shuffle:
88
+ shuffled_dataset = self.dataset.shuffle(self.shuffle_buffer, seed=self.seed)
89
+ self.seed += 1
90
+ self.ds_iter = iter(shuffled_dataset)
91
+ else:
92
+ self.ds_iter = iter(self.dataset)
93
+
94
+ def __next__(self):
95
+ return make_batch(self.queue.get())
96
+
97
+ def run(self):
98
+ i = 0
99
+ while True and i < self.max_steps:
100
+ # prepair next batch
101
+ sample = self.rem.copy()
102
+ l = len(sample["input_ids"])
103
+ max_length = self.max_length
104
+ while l < max_length:
105
+ try:
106
+ next_sample = next(self.ds_iter)
107
+ except StopIteration:
108
+ # reset generator if a pass through dataset is completed
109
+ self.make_iter()
110
+ l += len(next_sample["input_ids"])
111
+ sample = {k:sample[k]+next_sample[k] for k in next_sample.keys()}
112
+
113
+ self.rem = {k:v[max_length:] for k,v in sample.items()}
114
+ sample = {k:v[:max_length] for k,v in sample.items()}
115
+ # regroup to shape [bs x seq_len]
116
+ samples = {k:np.array([v[i*self.seq_len:(i+1)*self.seq_len] for i in range(self.bs)]) for k,v in sample.items()}
117
+
118
+ self.queue.put(samples)
119
+ self.queue.put(None)
120
+
121
+ def __iter__(self):
122
+ return self