# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. import typing from collections import defaultdict from typing import Any, Counter, DefaultDict, Dict, Optional, Tuple, Union import torch.nn as nn from rich import box from rich.console import Console from rich.table import Table from torch import Tensor from .jit_analysis import JitModelAnalysis from .jit_handles import (Handle, addmm_flop_jit, batchnorm_flop_jit, bmm_flop_jit, conv_flop_jit, einsum_flop_jit, elementwise_flop_counter, generic_activation_jit, linear_flop_jit, matmul_flop_jit, norm_flop_counter) # A dictionary that maps supported operations to their flop count jit handles. _DEFAULT_SUPPORTED_FLOP_OPS: Dict[str, Handle] = { 'aten::addmm': addmm_flop_jit, 'aten::bmm': bmm_flop_jit, 'aten::_convolution': conv_flop_jit, 'aten::einsum': einsum_flop_jit, 'aten::matmul': matmul_flop_jit, 'aten::mm': matmul_flop_jit, 'aten::linear': linear_flop_jit, # You might want to ignore BN flops due to inference-time fusion. # Use `set_op_handle("aten::batch_norm", None) 'aten::batch_norm': batchnorm_flop_jit, 'aten::group_norm': norm_flop_counter(2), 'aten::layer_norm': norm_flop_counter(2), 'aten::instance_norm': norm_flop_counter(1), 'aten::upsample_nearest2d': elementwise_flop_counter(0, 1), 'aten::upsample_bilinear2d': elementwise_flop_counter(0, 4), 'aten::adaptive_avg_pool2d': elementwise_flop_counter(1, 0), 'aten::grid_sampler': elementwise_flop_counter(0, 4), # assume bilinear } # A dictionary that maps supported operations to # their activation count handles. _DEFAULT_SUPPORTED_ACT_OPS: Dict[str, Handle] = { 'aten::_convolution': generic_activation_jit('conv'), 'aten::addmm': generic_activation_jit(), 'aten::bmm': generic_activation_jit(), 'aten::einsum': generic_activation_jit(), 'aten::matmul': generic_activation_jit(), 'aten::linear': generic_activation_jit(), } class FlopAnalyzer(JitModelAnalysis): """Provides access to per-submodule model flop count obtained by tracing a model with pytorch's jit tracing functionality. By default, comes with standard flop counters for a few common operators. Note: - Flop is not a well-defined concept. We just produce our best estimate. - We count one fused multiply-add as one flop. Handles for additional operators may be added, or the default ones overwritten, using the ``.set_op_handle(name, func)`` method. See the method documentation for details. Flop counts can be obtained as: - ``.total(module_name="")``: total flop count for the module - ``.by_operator(module_name="")``: flop counts for the module, as a Counter over different operator types - ``.by_module()``: Counter of flop counts for all submodules - ``.by_module_and_operator()``: dictionary indexed by descendant of Counters over different operator types An operator is treated as within a module if it is executed inside the module's ``__call__`` method. Note that this does not include calls to other methods of the module or explicit calls to ``module.forward(...)``. Modified from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/flop_count.py Args: model (nn.Module): The model to analyze. inputs (Union[Tensor, Tuple[Tensor, ...]]): The input to the model. Examples: >>> import torch.nn as nn >>> import torch >>> class TestModel(nn.Module): ... def __init__(self): ... super().__init__() ... self.fc = nn.Linear(in_features=1000, out_features=10) ... self.conv = nn.Conv2d( ... in_channels=3, out_channels=10, kernel_size=1 ... ) ... self.act = nn.ReLU() ... def forward(self, x): ... return self.fc(self.act(self.conv(x)).flatten(1)) >>> model = TestModel() >>> inputs = (torch.randn((1,3,10,10)),) >>> flops = FlopAnalyzer(model, inputs) >>> flops.total() 13000 >>> flops.total("fc") 10000 >>> flops.by_operator() Counter({"addmm" : 10000, "conv" : 3000}) >>> flops.by_module() Counter({"" : 13000, "fc" : 10000, "conv" : 3000, "act" : 0}) >>> flops.by_module_and_operator() {"" : Counter({"addmm" : 10000, "conv" : 3000}), "fc" : Counter({"addmm" : 10000}), "conv" : Counter({"conv" : 3000}), "act" : Counter() } """ def __init__( self, model: nn.Module, inputs: Union[Tensor, Tuple[Tensor, ...]], ) -> None: super().__init__(model=model, inputs=inputs) self.set_op_handle(**_DEFAULT_SUPPORTED_FLOP_OPS) __init__.__doc__ = JitModelAnalysis.__init__.__doc__ class ActivationAnalyzer(JitModelAnalysis): """Provides access to per-submodule model activation count obtained by tracing a model with pytorch's jit tracing functionality. By default, comes with standard activation counters for convolutional and dot-product operators. Handles for additional operators may be added, or the default ones overwritten, using the ``.set_op_handle(name, func)`` method. See the method documentation for details. Activation counts can be obtained as: - ``.total(module_name="")``: total activation count for a module - ``.by_operator(module_name="")``: activation counts for the module, as a Counter over different operator types - ``.by_module()``: Counter of activation counts for all submodules - ``.by_module_and_operator()``: dictionary indexed by descendant of Counters over different operator types An operator is treated as within a module if it is executed inside the module's ``__call__`` method. Note that this does not include calls to other methods of the module or explicit calls to ``module.forward(...)``. Modified from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/activation_count.py Args: model (nn.Module): The model to analyze. inputs (Union[Tensor, Tuple[Tensor, ...]]): The input to the model. Examples: >>> import torch.nn as nn >>> import torch >>> class TestModel(nn.Module): ... def __init__(self): ... super().__init__() ... self.fc = nn.Linear(in_features=1000, out_features=10) ... self.conv = nn.Conv2d( ... in_channels=3, out_channels=10, kernel_size=1 ... ) ... self.act = nn.ReLU() ... def forward(self, x): ... return self.fc(self.act(self.conv(x)).flatten(1)) >>> model = TestModel() >>> inputs = (torch.randn((1,3,10,10)),) >>> acts = ActivationAnalyzer(model, inputs) >>> acts.total() 1010 >>> acts.total("fc") 10 >>> acts.by_operator() Counter({"conv" : 1000, "addmm" : 10}) >>> acts.by_module() Counter({"" : 1010, "fc" : 10, "conv" : 1000, "act" : 0}) >>> acts.by_module_and_operator() {"" : Counter({"conv" : 1000, "addmm" : 10}), "fc" : Counter({"addmm" : 10}), "conv" : Counter({"conv" : 1000}), "act" : Counter() } """ def __init__( self, model: nn.Module, inputs: Union[Tensor, Tuple[Tensor, ...]], ) -> None: super().__init__(model=model, inputs=inputs) self.set_op_handle(**_DEFAULT_SUPPORTED_ACT_OPS) __init__.__doc__ = JitModelAnalysis.__init__.__doc__ def flop_count( model: nn.Module, inputs: Tuple[Any, ...], supported_ops: Optional[Dict[str, Handle]] = None, ) -> Tuple[DefaultDict[str, float], Counter[str]]: """Given a model and an input to the model, compute the per-operator Gflops of the given model. Adopted from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/flop_count.py Args: model (nn.Module): The model to compute flop counts. inputs (tuple): Inputs that are passed to `model` to count flops. Inputs need to be in a tuple. supported_ops (dict(str,Callable) or None) : provide additional handlers for extra ops, or overwrite the existing handlers for convolution and matmul and einsum. The key is operator name and the value is a function that takes (inputs, outputs) of the op. We count one Multiply-Add as one FLOP. Returns: tuple[defaultdict, Counter]: A dictionary that records the number of gflops for each operation and a Counter that records the number of unsupported operations. """ if supported_ops is None: supported_ops = {} flop_counter = FlopAnalyzer(model, inputs).set_op_handle(**supported_ops) giga_flops = defaultdict(float) for op, flop in flop_counter.by_operator().items(): giga_flops[op] = flop / 1e9 return giga_flops, flop_counter.unsupported_ops() def activation_count( model: nn.Module, inputs: Tuple[Any, ...], supported_ops: Optional[Dict[str, Handle]] = None, ) -> Tuple[DefaultDict[str, float], Counter[str]]: """Given a model and an input to the model, compute the total number of activations of the model. Adopted from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/activation_count.py Args: model (nn.Module): The model to compute activation counts. inputs (tuple): Inputs that are passed to `model` to count activations. Inputs need to be in a tuple. supported_ops (dict(str,Callable) or None) : provide additional handlers for extra ops, or overwrite the existing handlers for convolution and matmul. The key is operator name and the value is a function that takes (inputs, outputs) of the op. Returns: tuple[defaultdict, Counter]: A dictionary that records the number of activation (mega) for each operation and a Counter that records the number of unsupported operations. """ if supported_ops is None: supported_ops = {} act_counter = ActivationAnalyzer(model, inputs).set_op_handle(**supported_ops) mega_acts = defaultdict(float) for op, act in act_counter.by_operator().items(): mega_acts[op] = act / 1e6 return mega_acts, act_counter.unsupported_ops() def parameter_count(model: nn.Module) -> typing.DefaultDict[str, int]: """Count parameters of a model and its submodules. Adopted from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/parameter_count.py Args: model (nn.Module): the model to count parameters. Returns: dict[str, int]: the key is either a parameter name or a module name. The value is the number of elements in the parameter, or in all parameters of the module. The key "" corresponds to the total number of parameters of the model. """ count = defaultdict(int) # type: typing.DefaultDict[str, int] for name, param in model.named_parameters(): size = param.numel() name = name.split('.') for k in range(0, len(name) + 1): prefix = '.'.join(name[:k]) count[prefix] += size return count def parameter_count_table(model: nn.Module, max_depth: int = 3) -> str: """Format the parameter count of the model (and its submodules or parameters) Adopted from https://github.com/facebookresearch/fvcore/blob/main/fvcore/nn/parameter_count.py Args: model (nn.Module): the model to count parameters. max_depth (int): maximum depth to recursively print submodules or parameters Returns: str: the table to be printed """ count: typing.DefaultDict[str, int] = parameter_count(model) # pyre-fixme[24]: Generic type `tuple` expects at least 1 type parameter. param_shape: typing.Dict[str, typing.Tuple] = { k: tuple(v.shape) for k, v in model.named_parameters() } # pyre-fixme[24]: Generic type `tuple` expects at least 1 type parameter. rows: typing.List[typing.Tuple] = [] def format_size(x: int) -> str: if x > 1e8: return f'{x / 1e9:.1f}G' if x > 1e5: return f'{x / 1e6:.1f}M' if x > 1e2: return f'{x / 1e3:.1f}K' return str(x) def fill(lvl: int, prefix: str) -> None: if lvl >= max_depth: return for name, v in count.items(): if name.count('.') == lvl and name.startswith(prefix): indent = ' ' * (lvl + 1) if name in param_shape: rows.append( (indent + name, indent + str(param_shape[name]))) else: rows.append((indent + name, indent + format_size(v))) fill(lvl + 1, name + '.') rows.append(('model', format_size(count.pop('')))) fill(0, '') table = Table( title=f'parameter count of {model.__class__.__name__}', box=box.ASCII2) table.add_column('name') table.add_column('#elements or shape') for row in rows: table.add_row(*row) console = Console() with console.capture() as capture: console.print(table, end='') return capture.get()