pszemraj commited on
Commit
b9ece17
1 Parent(s): c0ffe03

add fp32 model

Browse files
Files changed (3) hide show
  1. latest +1 -0
  2. pytorch_model.bin +2 -2
  3. zero_to_fp32.py +468 -0
latest ADDED
@@ -0,0 +1 @@
 
 
1
+ global_step861
pytorch_model.bin CHANGED
@@ -1,3 +1,3 @@
1
  version https://git-lfs.github.com/spec/v1
2
- oid sha256:cde81870796fb56ea3a0b583362438368eb1e0033898750b368d188c8df5ba83
3
- size 5571155658
 
1
  version https://git-lfs.github.com/spec/v1
2
+ oid sha256:e6b513945c01dd13fa0f7cbc4c202c9989766e350818030341147d26521c2d24
3
+ size 11142172078
zero_to_fp32.py ADDED
@@ -0,0 +1,468 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python
2
+
3
+ # This script extracts fp32 consolidated weights from a zero 2 and 3 DeepSpeed checkpoints. It gets
4
+ # copied into the top level checkpoint dir, so the user can easily do the conversion at any point in
5
+ # the future. Once extracted, the weights don't require DeepSpeed and can be used in any
6
+ # application.
7
+ #
8
+ # example: python zero_to_fp32.py . pytorch_model.bin
9
+
10
+ import argparse
11
+ import torch
12
+ import glob
13
+ import math
14
+ import os
15
+ from collections import OrderedDict
16
+
17
+ # while this script doesn't use deepspeed to recover data, since the checkpoints are pickled with
18
+ # DeepSpeed data structures it has to be available in the current python environment.
19
+ import deepspeed
20
+ from deepspeed.utils import logger
21
+ from deepspeed.checkpoint.constants import (DS_VERSION,
22
+ OPTIMIZER_STATE_DICT,
23
+ PARAM_SHAPES,
24
+ SINGLE_PARTITION_OF_FP32_GROUPS,
25
+ FP32_FLAT_GROUPS,
26
+ ZERO_STAGE,
27
+ PARTITION_COUNT,
28
+ PARAM_SHAPES,
29
+ BUFFER_NAMES)
30
+
31
+ debug = 0
32
+
33
+ # load to cpu
34
+ device = torch.device('cpu')
35
+
36
+
37
+ def get_model_state_file(checkpoint_dir, zero_stage):
38
+ if not os.path.isdir(checkpoint_dir):
39
+ raise FileNotFoundError(f"Directory '{checkpoint_dir}' doesn't exist")
40
+
41
+ # there should be only one file
42
+ if zero_stage == 2:
43
+ file = os.path.join(checkpoint_dir, "mp_rank_00_model_states.pt")
44
+ elif zero_stage == 3:
45
+ file = os.path.join(checkpoint_dir, "zero_pp_rank_0_mp_rank_00_model_states.pt")
46
+
47
+ if not os.path.exists(file):
48
+ raise FileNotFoundError(f"can't find model states file at '{file}'")
49
+
50
+ return file
51
+
52
+
53
+ def get_optim_files(checkpoint_dir):
54
+ # XXX: need to test that this simple glob rule works for multi-node setup too
55
+ optim_files = sorted(glob.glob(os.path.join(checkpoint_dir, "*_optim_states.pt")))
56
+
57
+ if len(optim_files) == 0:
58
+ raise FileNotFoundError(
59
+ f"can't find '*_optim_states.pt' files in directory '{checkpoint_dir}'")
60
+
61
+ return optim_files
62
+
63
+
64
+ def parse_model_state(file):
65
+ state_dict = torch.load(file, map_location=device)
66
+
67
+ if BUFFER_NAMES not in state_dict:
68
+ raise ValueError(f"{file} is not a model state checkpoint")
69
+ buffer_names = state_dict[BUFFER_NAMES]
70
+ if debug:
71
+ print("Found buffers:", buffer_names)
72
+
73
+ # recover just the buffers while restoring them to fp32 if they were saved in fp16
74
+ buffers = {
75
+ k: v.float()
76
+ for k,
77
+ v in state_dict["module"].items() if k in buffer_names
78
+ }
79
+ param_shapes = state_dict[PARAM_SHAPES]
80
+
81
+ ds_version = state_dict.get(DS_VERSION, None)
82
+
83
+ return buffers, param_shapes, ds_version
84
+
85
+
86
+ def parse_optim_states(files, ds_checkpoint_dir):
87
+
88
+ total_files = len(files)
89
+ state_dicts = []
90
+ for f in files:
91
+ state_dicts.append(torch.load(f, map_location=device))
92
+
93
+ if not ZERO_STAGE in state_dicts[0][OPTIMIZER_STATE_DICT]:
94
+ raise ValueError(f"{files[0]} is not a zero checkpoint")
95
+ zero_stage = state_dicts[0][OPTIMIZER_STATE_DICT][ZERO_STAGE]
96
+ world_size = state_dicts[0][OPTIMIZER_STATE_DICT][PARTITION_COUNT]
97
+
98
+ # For ZeRO-2 each param group can have different partition_count as data parallelism for expert
99
+ # parameters can be different from data parallelism for non-expert parameters. So we can just
100
+ # use the max of the partition_count to get the dp world_size.
101
+
102
+ if type(world_size) is list:
103
+ world_size = max(world_size)
104
+
105
+ if world_size != total_files:
106
+ raise ValueError(
107
+ f"Expected {world_size} of '*_optim_states.pt' under '{ds_checkpoint_dir}' but found {total_files} files. "
108
+ "Possibly due to an overwrite of an old checkpoint, or a checkpoint didn't get saved by one or more processes."
109
+ )
110
+
111
+ # the groups are named differently in each stage
112
+ if zero_stage == 2:
113
+ fp32_groups_key = SINGLE_PARTITION_OF_FP32_GROUPS
114
+ elif zero_stage == 3:
115
+ fp32_groups_key = FP32_FLAT_GROUPS
116
+ else:
117
+ raise ValueError(f"unknown zero stage {zero_stage}")
118
+
119
+ if zero_stage == 2:
120
+ fp32_flat_groups = [
121
+ state_dicts[i][OPTIMIZER_STATE_DICT][fp32_groups_key]
122
+ for i in range(len(state_dicts))
123
+ ]
124
+ elif zero_stage == 3:
125
+ # if there is more than one param group, there will be multiple flattened tensors - one
126
+ # flattened tensor per group - for simplicity merge them into a single tensor
127
+ #
128
+ # XXX: could make the script more memory efficient for when there are multiple groups - it
129
+ # will require matching the sub-lists of param_shapes for each param group flattened tensor
130
+
131
+ fp32_flat_groups = [
132
+ torch.cat(state_dicts[i][OPTIMIZER_STATE_DICT][fp32_groups_key],
133
+ 0) for i in range(len(state_dicts))
134
+ ]
135
+
136
+ return zero_stage, world_size, fp32_flat_groups
137
+
138
+
139
+ def _get_fp32_state_dict_from_zero_checkpoint(ds_checkpoint_dir):
140
+ """
141
+ Returns fp32 state_dict reconstructed from ds checkpoint
142
+
143
+ Args:
144
+ - ``ds_checkpoint_dir``: path to the deepspeed checkpoint folder (where the optimizer files are)
145
+
146
+ """
147
+ print(f"Processing zero checkpoint '{ds_checkpoint_dir}'")
148
+
149
+ optim_files = get_optim_files(ds_checkpoint_dir)
150
+ zero_stage, world_size, fp32_flat_groups = parse_optim_states(optim_files, ds_checkpoint_dir)
151
+ print(
152
+ f"Detected checkpoint of type zero stage {zero_stage}, world_size: {world_size}")
153
+
154
+ model_file = get_model_state_file(ds_checkpoint_dir, zero_stage)
155
+ buffers, param_shapes, ds_version = parse_model_state(model_file)
156
+ print(f'Parsing checkpoint created by deepspeed=={ds_version}')
157
+
158
+ if zero_stage == 2:
159
+ return _get_fp32_state_dict_from_zero2_checkpoint(world_size,
160
+ param_shapes,
161
+ fp32_flat_groups,
162
+ buffers)
163
+ elif zero_stage == 3:
164
+ return _get_fp32_state_dict_from_zero3_checkpoint(world_size,
165
+ param_shapes,
166
+ fp32_flat_groups,
167
+ buffers)
168
+
169
+
170
+ def _get_fp32_state_dict_from_zero2_checkpoint(world_size,
171
+ param_shapes,
172
+ fp32_flat_groups,
173
+ buffers):
174
+
175
+ # Reconstruction protocol:
176
+ #
177
+ # XXX: document this
178
+
179
+ if debug:
180
+ for i in range(world_size):
181
+ for j in range(len(fp32_flat_groups[0])):
182
+ print(
183
+ f"{FP32_FLAT_GROUPS}[{i}][{j}].shape={fp32_flat_groups[i][j].shape}")
184
+
185
+ # XXX: memory usage doubles here (zero2)
186
+ num_param_groups = len(fp32_flat_groups[0])
187
+ merged_single_partition_of_fp32_groups = []
188
+ for i in range(num_param_groups):
189
+ merged_partitions = [sd[i] for sd in fp32_flat_groups]
190
+ full_single_fp32_vector = torch.cat(merged_partitions, 0)
191
+ merged_single_partition_of_fp32_groups.append(full_single_fp32_vector)
192
+ avail_numel = sum([
193
+ full_single_fp32_vector.numel()
194
+ for full_single_fp32_vector in merged_single_partition_of_fp32_groups
195
+ ])
196
+
197
+ if debug:
198
+ wanted_params = sum([len(shapes) for shapes in param_shapes])
199
+ wanted_numel = sum(
200
+ [sum(shape.numel() for shape in shapes.values()) for shapes in param_shapes])
201
+ # not asserting if there is a mismatch due to possible padding
202
+ print(f"Have {avail_numel} numels to process.")
203
+ print(f"Need {wanted_numel} numels in {wanted_params} params.")
204
+
205
+ state_dict = OrderedDict()
206
+
207
+ # buffers
208
+ state_dict.update(buffers)
209
+ if debug:
210
+ print(f"added {len(buffers)} buffers")
211
+
212
+ # params
213
+ # XXX: for huge models that can't fit into the host's RAM we will have to recode this to support
214
+ # out-of-core computing solution
215
+ total_numel = 0
216
+ total_params = 0
217
+ for shapes, full_single_fp32_vector in zip(param_shapes, merged_single_partition_of_fp32_groups):
218
+ offset = 0
219
+ avail_numel = full_single_fp32_vector.numel()
220
+ for name, shape in shapes.items():
221
+
222
+ unpartitioned_numel = shape.numel()
223
+ total_numel += unpartitioned_numel
224
+ total_params += 1
225
+
226
+ if debug:
227
+ print(
228
+ f"{name} full shape: {shape} unpartitioned numel {unpartitioned_numel} "
229
+ )
230
+ state_dict[name] = full_single_fp32_vector.narrow(
231
+ 0,
232
+ offset,
233
+ unpartitioned_numel).view(shape)
234
+ offset += unpartitioned_numel
235
+
236
+ # Z2 started to align to 2*world_size to improve nccl performance. Therefore both offset and
237
+ # avail_numel can differ by anywhere between 0..2*world_size. Due to two unrelated complex
238
+ # paddings performed in the code it's almost impossible to predict the exact numbers w/o the
239
+ # live optimizer object, so we are checking that the numbers are within the right range
240
+ align_to = 2 * world_size
241
+
242
+ def zero2_align(x):
243
+ return align_to * math.ceil(x / align_to)
244
+
245
+ if debug:
246
+ print(f"original offset={offset}, avail_numel={avail_numel}")
247
+
248
+ offset = zero2_align(offset)
249
+ avail_numel = zero2_align(avail_numel)
250
+
251
+ if debug:
252
+ print(f"aligned offset={offset}, avail_numel={avail_numel}")
253
+
254
+ # Sanity check
255
+ if offset != avail_numel:
256
+ raise ValueError(
257
+ f"consumed {offset} numels out of {avail_numel} - something is wrong")
258
+
259
+ print(
260
+ f"Reconstructed fp32 state dict with {total_params} params {total_numel} elements"
261
+ )
262
+
263
+ return state_dict
264
+
265
+
266
+ def zero3_partitioned_param_info(unpartitioned_numel, world_size):
267
+ remainder = unpartitioned_numel % world_size
268
+ padding_numel = (world_size - remainder) if remainder else 0
269
+ partitioned_numel = math.ceil(unpartitioned_numel / world_size)
270
+ return partitioned_numel, padding_numel
271
+
272
+
273
+ def _get_fp32_state_dict_from_zero3_checkpoint(world_size,
274
+ param_shapes,
275
+ fp32_flat_groups,
276
+ buffers):
277
+
278
+ # Reconstruction protocol: For zero3 we need to zip the partitions together at boundary of each
279
+ # param, re-consolidating each param, while dealing with padding if any
280
+
281
+ avail_numel = fp32_flat_groups[0].numel() * world_size
282
+ # merge list of dicts, preserving order
283
+ param_shapes = {k: v for d in param_shapes for k, v in d.items()}
284
+
285
+ if debug:
286
+ for i in range(world_size):
287
+ print(f"{FP32_FLAT_GROUPS}[{i}].shape={fp32_flat_groups[i].shape}")
288
+
289
+ wanted_params = len(param_shapes)
290
+ wanted_numel = sum(shape.numel() for shape in param_shapes.values())
291
+ # not asserting if there is a mismatch due to possible padding
292
+ print(f"Have {avail_numel} numels to process.")
293
+ print(f"Need {wanted_numel} numels in {wanted_params} params.")
294
+
295
+ state_dict = OrderedDict()
296
+
297
+ # buffers
298
+ state_dict.update(buffers)
299
+ if debug:
300
+ print(f"added {len(buffers)} buffers")
301
+
302
+ # params
303
+ # XXX: for huge models that can't fit into the host's RAM we will have to recode this to support
304
+ # out-of-core computing solution
305
+ offset = 0
306
+ total_numel = 0
307
+ total_params = 0
308
+ for name, shape in param_shapes.items():
309
+
310
+ unpartitioned_numel = shape.numel()
311
+ total_numel += unpartitioned_numel
312
+ total_params += 1
313
+
314
+ partitioned_numel, partitioned_padding_numel = zero3_partitioned_param_info(unpartitioned_numel, world_size)
315
+
316
+ if debug:
317
+ print(
318
+ f"{total_params} {name} full shape: {shape} partition0 numel={partitioned_numel} partitioned_padding_numel={partitioned_padding_numel}"
319
+ )
320
+
321
+ # XXX: memory usage doubles here
322
+ state_dict[name] = torch.cat(
323
+ tuple(fp32_flat_groups[i].narrow(0,
324
+ offset,
325
+ partitioned_numel)
326
+ for i in range(world_size)),
327
+ 0).narrow(0,
328
+ 0,
329
+ unpartitioned_numel).view(shape)
330
+ offset += partitioned_numel
331
+
332
+ offset *= world_size
333
+
334
+ # Sanity check
335
+ if offset != avail_numel:
336
+ raise ValueError(
337
+ f"consumed {offset} numels out of {avail_numel} - something is wrong")
338
+
339
+ print(
340
+ f"Reconstructed fp32 state dict with {total_params} params {total_numel} elements"
341
+ )
342
+
343
+ return state_dict
344
+
345
+
346
+ def get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir, tag=None):
347
+ """
348
+ Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated state_dict that can be loaded with
349
+ ``load_state_dict()`` and used for training without DeepSpeed or shared with others, for example
350
+ via a model hub.
351
+
352
+ Args:
353
+ - ``checkpoint_dir``: path to the desired checkpoint folder
354
+ - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in 'latest' file. e.g., ``global_step14``
355
+
356
+ Returns:
357
+ - pytorch ``state_dict``
358
+
359
+ Note: this approach may not work if your application doesn't have sufficient free CPU memory and
360
+ you may need to use the offline approach using the ``zero_to_fp32.py`` script that is saved with
361
+ the checkpoint.
362
+
363
+ A typical usage might be ::
364
+
365
+ from deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
366
+ # do the training and checkpoint saving
367
+ state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir) # already on cpu
368
+ model = model.cpu() # move to cpu
369
+ model.load_state_dict(state_dict)
370
+ # submit to model hub or save the model to share with others
371
+
372
+ In this example the ``model`` will no longer be usable in the deepspeed context of the same
373
+ application. i.e. you will need to re-initialize the deepspeed engine, since
374
+ ``model.load_state_dict(state_dict)`` will remove all the deepspeed magic from it.
375
+
376
+ If you want it all done for you, use ``load_state_dict_from_zero_checkpoint`` instead.
377
+
378
+ """
379
+ if tag is None:
380
+ latest_path = os.path.join(checkpoint_dir, 'latest')
381
+ if os.path.isfile(latest_path):
382
+ with open(latest_path, 'r') as fd:
383
+ tag = fd.read().strip()
384
+ else:
385
+ raise ValueError(f"Unable to find 'latest' file at {latest_path}")
386
+
387
+ ds_checkpoint_dir = os.path.join(checkpoint_dir, tag)
388
+
389
+ if not os.path.isdir(ds_checkpoint_dir):
390
+ raise FileNotFoundError(f"Directory '{ds_checkpoint_dir}' doesn't exist")
391
+
392
+ return _get_fp32_state_dict_from_zero_checkpoint(ds_checkpoint_dir)
393
+
394
+
395
+ def convert_zero_checkpoint_to_fp32_state_dict(checkpoint_dir, output_file, tag=None):
396
+ """
397
+ Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated ``state_dict`` file that can be
398
+ loaded with ``torch.load(file)`` + ``load_state_dict()`` and used for training without DeepSpeed.
399
+
400
+ Args:
401
+ - ``checkpoint_dir``: path to the desired checkpoint folder. (one that contains the tag-folder, like ``global_step14``)
402
+ - ``output_file``: path to the pytorch fp32 state_dict output file (e.g. path/pytorch_model.bin)
403
+ - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in the file named ``latest`` in the checkpoint folder, e.g., ``global_step14``
404
+ """
405
+
406
+ state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir, tag)
407
+ print(f"Saving fp32 state dict to {output_file}")
408
+ torch.save(state_dict, output_file)
409
+
410
+
411
+ def load_state_dict_from_zero_checkpoint(model, checkpoint_dir, tag=None):
412
+ """
413
+ 1. Put the provided model to cpu
414
+ 2. Convert ZeRO 2 or 3 checkpoint into a single fp32 consolidated ``state_dict``
415
+ 3. Load it into the provided model
416
+
417
+ Args:
418
+ - ``model``: the model object to update
419
+ - ``checkpoint_dir``: path to the desired checkpoint folder. (one that contains the tag-folder, like ``global_step14``)
420
+ - ``tag``: checkpoint tag used as a unique identifier for checkpoint. If not provided will attempt to load tag in the file named ``latest`` in the checkpoint folder, e.g., ``global_step14``
421
+
422
+ Returns:
423
+ - ``model`: modified model
424
+
425
+ Make sure you have plenty of CPU memory available before you call this function. If you don't
426
+ have enough use the ``zero_to_fp32.py`` utility to do the conversion. You will find it
427
+ conveniently placed for you in the checkpoint folder.
428
+
429
+ A typical usage might be ::
430
+
431
+ from deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
432
+ model = load_state_dict_from_zero_checkpoint(trainer.model, checkpoint_dir)
433
+ # submit to model hub or save the model to share with others
434
+
435
+ Note, that once this was run, the ``model`` will no longer be usable in the deepspeed context
436
+ of the same application. i.e. you will need to re-initialize the deepspeed engine, since
437
+ ``model.load_state_dict(state_dict)`` will remove all the deepspeed magic from it.
438
+
439
+ """
440
+ logger.info(f"Extracting fp32 weights")
441
+ state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir, tag)
442
+
443
+ logger.info(f"Overwriting model with fp32 weights")
444
+ model = model.cpu()
445
+ model.load_state_dict(state_dict, strict=False)
446
+
447
+ return model
448
+
449
+
450
+ if __name__ == "__main__":
451
+
452
+ parser = argparse.ArgumentParser()
453
+ parser.add_argument(
454
+ "checkpoint_dir",
455
+ type=str,
456
+ help="path to the desired checkpoint folder, e.g., path/checkpoint-12")
457
+ parser.add_argument(
458
+ "output_file",
459
+ type=str,
460
+ help=
461
+ "path to the pytorch fp32 state_dict output file (e.g. path/checkpoint-12/pytorch_model.bin)"
462
+ )
463
+ parser.add_argument("-d", "--debug", action='store_true', help="enable debug")
464
+ args = parser.parse_args()
465
+
466
+ debug = args.debug
467
+
468
+ convert_zero_checkpoint_to_fp32_state_dict(args.checkpoint_dir, args.output_file)