Spaces:
Paused
Paused
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import logging | |
| import os | |
| import os.path as osp | |
| import sys | |
| from typing import Callable, Optional, Union | |
| import torch | |
| from mmengine.dist import master_only | |
| from mmengine.hooks import Hook | |
| from mmengine.logging import print_log | |
| from mmengine.registry import HOOKS | |
| def check_kineto() -> bool: # noqa | |
| kineto_exist = False | |
| try: | |
| if torch.autograd.kineto_available(): | |
| kineto_exist = True | |
| except AttributeError: | |
| print_log('NO KINETO', logger='current', level=logging.WARNING) | |
| return kineto_exist | |
| class ProfilerHook(Hook): | |
| """A hook to analyze performance during training and inference. | |
| PyTorch Profiler is a tool that allows the collection of the performance | |
| metrics during the training. More details on Profiler can be found at | |
| `official docs <https://pytorch.org/docs/stable/profiler.html | |
| #torch.profiler.profile>`_ | |
| Args: | |
| by_epoch (bool): Profile performance by epoch or by iteration. | |
| Defaults to True. | |
| profile_times (int): The period (epoch/iter) recorded by the profiler. | |
| Defaults to 1. For example, profile_iters=10 and by_epoch=False, | |
| indicate that 0-10 iterations are recorded. | |
| activity_with_cpu (bool): Activities to be used in the analysis (CPU) | |
| activity_with_cuda (bool): Activities to be used in the analysis (CUDA) | |
| schedule (dict, optional): Key-word arguments passed to | |
| `torch.profile.schedule <https://pytorch.org/docs/stable/ | |
| profiler.html#torch.profiler.schedule>`_. | |
| Defaults to None, which means profiling without a schedule | |
| on_trace_ready (callable, dict, optional): Either a handler or a dict | |
| of generating handler. Defaults to None, which means profiling | |
| without an on_trace_ready.The Callable type needs to construct its | |
| own function that can handle 'torch.autograd.profiler.profile'. | |
| Two officially recommended ways are provided: | |
| - ``schedule=dict(type='log_trace')``: Print the profiling result | |
| in the terminal. See more details in the `PyTorch official tutorial`_. | |
| The configurable arguments are the same as | |
| ``prof.key_averages().table`` | |
| - ``scheduler=dict(type='tb_trace')``: Profile the performance | |
| with tensorboard. See more details in the tutorial | |
| `profile with tensorboard`_. | |
| record_shapes (bool): Save information about operator's input shapes. | |
| Defaults to False. | |
| profile_memory (bool): Track tensor memory allocation/deallocation. | |
| Defaults to False. | |
| with_stack (bool): Record source information (file and line number) | |
| for the ops. Defaults to False. | |
| with_flops (bool): Use formula to estimate the FLOPS of specific | |
| operators (matrix multiplication and 2D convolution). | |
| Defaults to False. | |
| json_trace_path (str, optional): Exports the collected trace in Chrome | |
| JSON format. Chrome use 'chrome://tracing' view json file. | |
| Defaults to None, which means profiling does not store json files. | |
| Warnings: | |
| The profiler will be closed after ``profile_times`` iterations | |
| automatically. Please make sure the configuration of your scheduler | |
| will not close the profiler before the iteration reach the value of | |
| ``profile_times`` | |
| Examples: | |
| >>> # tensorboard trace | |
| >>> trace_config = dict(type='tb_trace') | |
| >>> profiler_hook_cfg = dict(on_trace_ready=trace_config) | |
| .. _PyTorch official tutorial: https://pytorch.org/tutorials/recipes/recipes/profiler_recipe.html#using-profiler-to-analyze-execution-time | |
| .. _profile with tensorboard: https://pytorch.org/tutorials/intermediate/tensorboard_profiler_tutorial.html#pytorch-profiler-with-tensorboard | |
| """ # noqa: E501 | |
| priority = 'VERY_LOW' | |
| def __init__(self, | |
| *, | |
| by_epoch: bool = True, | |
| profile_times: int = 1, | |
| activity_with_cpu: bool = True, | |
| activity_with_cuda: bool = False, | |
| schedule: Optional[dict] = None, | |
| on_trace_ready: Union[Callable, dict, None] = None, | |
| record_shapes: bool = False, | |
| profile_memory: bool = False, | |
| with_stack: bool = False, | |
| with_flops: bool = False, | |
| json_trace_path: Optional[str] = None) -> None: | |
| try: | |
| from torch import profiler | |
| except ImportError: | |
| raise ImportError('please upgrade torch above 1.8.1') | |
| if not check_kineto(): | |
| raise ImportError('Due to Kineto support issues, please upgrade ' | |
| 'pytorch above 1.8.1(windows users above 1.9.1)') | |
| assert isinstance(by_epoch, bool), '``by_epoch`` should be a boolean.' | |
| self.by_epoch = by_epoch | |
| if profile_times < 1: | |
| raise ValueError('profile_iters should be greater than 0, ' | |
| f'but got {profile_times}') | |
| if by_epoch and profile_times > 1: | |
| raise ValueError( | |
| f'Profiler will profile 0-{profile_times} epochs.\n' | |
| 'Since profiler will slow down the training, it is recommended' | |
| ' to train 1 epoch with ProfilerHook and adjust your setting ' | |
| 'according to the profiler summary.\n' | |
| 'During normal training(epoch > 1), ' | |
| 'you may disable the ProfilerHook.') | |
| self.profile_times = profile_times | |
| assert isinstance(activity_with_cpu, bool), \ | |
| '``activity_with_cpu`` should be a boolean.' | |
| assert isinstance(activity_with_cuda, bool), \ | |
| '``activity_with_cuda`` should be a boolean.' | |
| self.activities = [] | |
| if activity_with_cpu: | |
| self.activities.append(profiler.ProfilerActivity.CPU) | |
| if activity_with_cuda: | |
| self.activities.append(profiler.ProfilerActivity.CUDA) | |
| if schedule is not None: | |
| assert isinstance(schedule, dict), '``schedule`` should be a dict.' | |
| self.schedule = profiler.schedule(**schedule) | |
| else: | |
| self.schedule = None | |
| self.on_trace_ready = on_trace_ready | |
| self.record_shapes = record_shapes | |
| self.profile_memory = profile_memory | |
| self.with_stack = with_stack | |
| self.with_flops = with_flops | |
| self.json_trace_path = json_trace_path | |
| self._closed = False | |
| def before_run(self, runner): | |
| """Initialize the profiler. | |
| Through the runner parameter, the validity of the parameter is further | |
| determined. | |
| """ | |
| max_times = runner.max_epochs if self.by_epoch else runner.max_iters | |
| if max_times < self.profile_times: | |
| raise ValueError( | |
| f'``profile_times`` should not be greater than {max_times}') | |
| on_trace_ready = self._parse_trace_config(runner) | |
| self.profiler = torch.profiler.profile( # noqa | |
| activities=self.activities, | |
| schedule=self.schedule, | |
| on_trace_ready=on_trace_ready, | |
| record_shapes=self.record_shapes, | |
| profile_memory=self.profile_memory, | |
| with_stack=self.with_stack, | |
| with_flops=self.with_flops) | |
| self.profiler.__enter__() | |
| runner.logger.info('profiler is profiling...') | |
| def _parse_trace_config(self, runner): | |
| """Used to parse the parameter 'on_trace_ready'.""" | |
| if self.on_trace_ready is None: | |
| _on_trace_ready = None | |
| elif callable(self.on_trace_ready): | |
| _on_trace_ready = self.on_trace_ready | |
| elif isinstance(self.on_trace_ready, dict): | |
| trace_cfg = self.on_trace_ready.copy() | |
| trace_type = trace_cfg.pop('type') | |
| # Build a log printing handle | |
| if trace_type == 'log_trace': | |
| def _log_handler(_profile): | |
| print(_profile.key_averages().table(**trace_cfg)) | |
| _on_trace_ready = _log_handler | |
| elif trace_type == 'tb_trace': # tensorboard_trace handler | |
| try: | |
| import torch_tb_profiler # noqa: F401 | |
| except ImportError: | |
| raise ImportError( | |
| 'please run ``pip install torch-tb-profiler``') | |
| if 'dir_name' not in trace_cfg: | |
| trace_cfg['dir_name'] = osp.join(runner.log_dir, | |
| 'tf_tracing_logs') | |
| elif not osp.isabs(trace_cfg['dir_name']): | |
| trace_cfg['dir_name'] = osp.join(runner.log_dir, | |
| trace_cfg['dir_name']) | |
| runner.logger.info('trace_files of ProfilerHook will be ' | |
| f'saved to {trace_cfg["dir_name"]}.') | |
| if self.json_trace_path is not None: | |
| runner.logger.warn( | |
| 'When using tensorboard_trace, it is recommended to ' | |
| 'save json files by setting ``worker_name`` instead of' | |
| ' setting ``json_trace_path``') | |
| _on_trace_ready = torch.profiler.tensorboard_trace_handler( | |
| **trace_cfg) | |
| else: | |
| raise ValueError('trace_type should be "log_trace" or ' | |
| f'"tb_trace", but got {trace_type}') | |
| else: | |
| raise ValueError( | |
| '``on_trace_ready`` should be a handler, or dict, or None, ' | |
| f'but got {self.on_trace_ready}') | |
| return _on_trace_ready | |
| def after_train_epoch(self, runner): | |
| """Determine if the content is exported.""" | |
| # `after_train_epoch` will also be called in IterBasedTrainLoop. | |
| # Here we check `self._closed` to avoid exiting twice. | |
| if not self._closed: | |
| self._export_chrome_trace(runner) | |
| def after_train_iter(self, runner, batch_idx, data_batch, outputs): | |
| """profiler will call `step` method if it is not closed.""" | |
| if not self._closed: | |
| self.profiler.step() | |
| if runner.iter == self.profile_times - 1 and not self.by_epoch: | |
| self._export_chrome_trace(runner) | |
| def _export_chrome_trace(self, runner): | |
| """Exporting content.""" | |
| self._closed = True | |
| runner.logger.info('profiler may take a few minutes...') | |
| self.profiler.__exit__(None, None, None) | |
| if self.json_trace_path is not None: | |
| self.profiler.export_chrome_trace(self.json_trace_path) | |
| class NPUProfilerHook(Hook): | |
| """NPUProfiler to analyze performance during training. | |
| NPU Profiling is used to count the device execution time of all operators. | |
| The torch_npu.npu.profile interface is used to complete the profiling data | |
| collection at each stage of the project, and the data is analyzed by the | |
| msprof tool and the data can be dumped to further manually analyze the | |
| key performance bottlenecks. For more details on the torch_npu.npu.profile | |
| interface, please visit | |
| https://gitee.com/ascend/pytorch/blob/master/torch_npu/npu/profiler.py#profile | |
| Args: | |
| begin (int): Number of start iterations for profiling. Defaults to 0. | |
| end (int): Number of end iterations for profiling. Defaults to 1. | |
| result_path (str): The path to save the profiling results file. | |
| Defaults to 'cann_profiling'. | |
| exit_after_profiling (bool): Whether to exit the program after | |
| profiling. Defaults to True. | |
| use_e2e_profiler (bool): Turn on E2E profiling, E2E profiling combines | |
| performance data at the Pytorch level and the NPU level to analyze | |
| the bottlenecks of model performance end-to-end, and cannot show | |
| detailed content, and only as an auxiliary analysis. | |
| Defaults to False. | |
| ge_profiling_to_std_out (bool): Turn on GE profiling, GE uses to | |
| collect the profiling data of the host side scheduling of the | |
| Assend device. Defaults to False. | |
| Examples: | |
| >>> cfg = ... | |
| >>> profiler_config = dict(type='NPUProfilerHook', end=2) | |
| >>> cfg.merge_from_dict({'custom_hooks': custom_hooks}) | |
| >>> runner = Runner.from_cfg(cfg) | |
| >>> runner.train() | |
| """ | |
| priority = 'VERY_LOW' | |
| def __init__(self, | |
| *, | |
| begin: int = 0, | |
| end: int = 1, | |
| result_path: str = 'cann_profiling', | |
| exit_after_profiling: bool = True, | |
| use_e2e_profiler: bool = False, | |
| ge_profiling_to_std_out: bool = False): | |
| try: | |
| import torch_npu | |
| except ImportError: | |
| raise ImportError('Failed to import torch_npu module') | |
| if begin >= end: | |
| raise ValueError( | |
| 'The iteration to start profiling should not be greater' | |
| 'than or equal to profile end') | |
| self.begin = begin | |
| self.end = end | |
| self.result_path = result_path | |
| self.exit_after_profiling = exit_after_profiling | |
| if ge_profiling_to_std_out: | |
| os.environ['GE_PROFILING_TO_STD_OUT'] = '1' | |
| if not osp.exists(self.result_path): | |
| os.makedirs(self.result_path, exist_ok=True) | |
| self.profiler = torch_npu.npu.profile( | |
| self.result_path, use_e2e_profiler=use_e2e_profiler) | |
| def before_run(self, runner): | |
| if self.end > runner.max_iters: | |
| raise ValueError( | |
| 'The profiling end iteration should not be greater' | |
| 'than the max iteration') | |
| def before_train_iter(self, runner, batch_idx, data_batch=None): | |
| if runner.iter == self.begin: | |
| self.profiler.__enter__() | |
| runner.logger.info('NPUProfiler starts profiling...') | |
| def after_train_iter(self, | |
| runner, | |
| batch_idx, | |
| data_batch=None, | |
| outputs=None): | |
| if runner.iter == self.end - 1: | |
| runner.logger.info('profiler may take a few minutes to' | |
| ' save the profiling result.') | |
| self.profiler.__exit__(None, None, None) | |
| if self.exit_after_profiling: | |
| sys.exit() | |