Spaces:
Runtime error
Runtime error
import collections | |
import importlib | |
import logging | |
import os | |
import time | |
from collections import OrderedDict | |
from collections.abc import Sequence | |
from itertools import repeat | |
import numpy as np | |
import torch | |
import torch.distributed as dist | |
def print_rank(var_name, var_value, rank=0): | |
if dist.get_rank() == rank: | |
print(f"[Rank {rank}] {var_name}: {var_value}") | |
def print_0(*args, **kwargs): | |
if dist.get_rank() == 0: | |
print(*args, **kwargs) | |
def requires_grad(model: torch.nn.Module, flag: bool = True) -> None: | |
""" | |
Set requires_grad flag for all parameters in a model. | |
""" | |
for p in model.parameters(): | |
p.requires_grad = flag | |
def format_numel_str(numel: int) -> str: | |
B = 1024**3 | |
M = 1024**2 | |
K = 1024 | |
if numel >= B: | |
return f"{numel / B:.2f} B" | |
elif numel >= M: | |
return f"{numel / M:.2f} M" | |
elif numel >= K: | |
return f"{numel / K:.2f} K" | |
else: | |
return f"{numel}" | |
def all_reduce_mean(tensor: torch.Tensor) -> torch.Tensor: | |
dist.all_reduce(tensor=tensor, op=dist.ReduceOp.SUM) | |
tensor.div_(dist.get_world_size()) | |
return tensor | |
def get_model_numel(model: torch.nn.Module) -> (int, int): | |
num_params = 0 | |
num_params_trainable = 0 | |
for p in model.parameters(): | |
num_params += p.numel() | |
if p.requires_grad: | |
num_params_trainable += p.numel() | |
return num_params, num_params_trainable | |
def try_import(name): | |
"""Try to import a module. | |
Args: | |
name (str): Specifies what module to import in absolute or relative | |
terms (e.g. either pkg.mod or ..mod). | |
Returns: | |
ModuleType or None: If importing successfully, returns the imported | |
module, otherwise returns None. | |
""" | |
try: | |
return importlib.import_module(name) | |
except ImportError: | |
return None | |
def transpose(x): | |
""" | |
transpose a list of list | |
Args: | |
x (list[list]): | |
""" | |
ret = list(map(list, zip(*x))) | |
return ret | |
def get_timestamp(): | |
timestamp = time.strftime("%Y%m%d-%H%M%S", time.localtime(time.time())) | |
return timestamp | |
def format_time(seconds): | |
days = int(seconds / 3600 / 24) | |
seconds = seconds - days * 3600 * 24 | |
hours = int(seconds / 3600) | |
seconds = seconds - hours * 3600 | |
minutes = int(seconds / 60) | |
seconds = seconds - minutes * 60 | |
secondsf = int(seconds) | |
seconds = seconds - secondsf | |
millis = int(seconds * 1000) | |
f = "" | |
i = 1 | |
if days > 0: | |
f += str(days) + "D" | |
i += 1 | |
if hours > 0 and i <= 2: | |
f += str(hours) + "h" | |
i += 1 | |
if minutes > 0 and i <= 2: | |
f += str(minutes) + "m" | |
i += 1 | |
if secondsf > 0 and i <= 2: | |
f += str(secondsf) + "s" | |
i += 1 | |
if millis > 0 and i <= 2: | |
f += str(millis) + "ms" | |
i += 1 | |
if f == "": | |
f = "0ms" | |
return f | |
def to_tensor(data): | |
"""Convert objects of various python types to :obj:`torch.Tensor`. | |
Supported types are: :class:`numpy.ndarray`, :class:`torch.Tensor`, | |
:class:`Sequence`, :class:`int` and :class:`float`. | |
Args: | |
data (torch.Tensor | numpy.ndarray | Sequence | int | float): Data to | |
be converted. | |
""" | |
if isinstance(data, torch.Tensor): | |
return data | |
elif isinstance(data, np.ndarray): | |
return torch.from_numpy(data) | |
elif isinstance(data, Sequence) and not isinstance(data, str): | |
return torch.tensor(data) | |
elif isinstance(data, int): | |
return torch.LongTensor([data]) | |
elif isinstance(data, float): | |
return torch.FloatTensor([data]) | |
else: | |
raise TypeError(f"type {type(data)} cannot be converted to tensor.") | |
def to_ndarray(data): | |
if isinstance(data, torch.Tensor): | |
return data.numpy() | |
elif isinstance(data, np.ndarray): | |
return data | |
elif isinstance(data, Sequence): | |
return np.array(data) | |
elif isinstance(data, int): | |
return np.ndarray([data], dtype=int) | |
elif isinstance(data, float): | |
return np.array([data], dtype=float) | |
else: | |
raise TypeError(f"type {type(data)} cannot be converted to ndarray.") | |
def to_torch_dtype(dtype): | |
if isinstance(dtype, torch.dtype): | |
return dtype | |
elif isinstance(dtype, str): | |
dtype_mapping = { | |
"float64": torch.float64, | |
"float32": torch.float32, | |
"float16": torch.float16, | |
"fp32": torch.float32, | |
"fp16": torch.float16, | |
"half": torch.float16, | |
"bf16": torch.bfloat16, | |
} | |
if dtype not in dtype_mapping: | |
raise ValueError | |
dtype = dtype_mapping[dtype] | |
return dtype | |
else: | |
raise ValueError | |
def count_params(model): | |
return sum(p.numel() for p in model.parameters() if p.requires_grad) | |
def _ntuple(n): | |
def parse(x): | |
if isinstance(x, collections.abc.Iterable) and not isinstance(x, str): | |
return x | |
return tuple(repeat(x, n)) | |
return parse | |
to_1tuple = _ntuple(1) | |
to_2tuple = _ntuple(2) | |
to_3tuple = _ntuple(3) | |
to_4tuple = _ntuple(4) | |
to_ntuple = _ntuple | |
def convert_SyncBN_to_BN2d(model_cfg): | |
for k in model_cfg: | |
v = model_cfg[k] | |
if k == "norm_cfg" and v["type"] == "SyncBN": | |
v["type"] = "BN2d" | |
elif isinstance(v, dict): | |
convert_SyncBN_to_BN2d(v) | |
def get_topk(x, dim=4, k=5): | |
x = to_tensor(x) | |
inds = x[..., dim].topk(k)[1] | |
return x[inds] | |
def param_sigmoid(x, alpha): | |
ret = 1 / (1 + (-alpha * x).exp()) | |
return ret | |
def inverse_param_sigmoid(x, alpha, eps=1e-5): | |
x = x.clamp(min=0, max=1) | |
x1 = x.clamp(min=eps) | |
x2 = (1 - x).clamp(min=eps) | |
return torch.log(x1 / x2) / alpha | |
def inverse_sigmoid(x, eps=1e-5): | |
"""Inverse function of sigmoid. | |
Args: | |
x (Tensor): The tensor to do the | |
inverse. | |
eps (float): EPS avoid numerical | |
overflow. Defaults 1e-5. | |
Returns: | |
Tensor: The x has passed the inverse | |
function of sigmoid, has same | |
shape with input. | |
""" | |
x = x.clamp(min=0, max=1) | |
x1 = x.clamp(min=eps) | |
x2 = (1 - x).clamp(min=eps) | |
return torch.log(x1 / x2) | |
def count_columns(df, columns): | |
cnt_dict = OrderedDict() | |
num_samples = len(df) | |
for col in columns: | |
d_i = df[col].value_counts().to_dict() | |
for k in d_i: | |
d_i[k] = (d_i[k], d_i[k] / num_samples) | |
cnt_dict[col] = d_i | |
return cnt_dict | |
def build_logger(work_dir, cfgname): | |
log_file = cfgname + ".log" | |
log_path = os.path.join(work_dir, log_file) | |
logger = logging.getLogger(cfgname) | |
logger.setLevel(logging.INFO) | |
# formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') | |
formatter = logging.Formatter("%(asctime)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
handler1 = logging.FileHandler(log_path) | |
handler1.setFormatter(formatter) | |
handler2 = logging.StreamHandler() | |
handler2.setFormatter(formatter) | |
logger.addHandler(handler1) | |
logger.addHandler(handler2) | |
logger.propagate = False | |
return logger | |