# Copyright (c) Facebook, Inc. and its affiliates. import datetime import json import logging import os import time from collections import defaultdict from contextlib import contextmanager from functools import cached_property from typing import Optional import torch from fvcore.common.history_buffer import HistoryBuffer from detectron2.utils.file_io import PathManager __all__ = [ "get_event_storage", "has_event_storage", "JSONWriter", "TensorboardXWriter", "CommonMetricPrinter", "EventStorage", ] _CURRENT_STORAGE_STACK = [] def get_event_storage(): """ Returns: The :class:`EventStorage` object that's currently being used. Throws an error if no :class:`EventStorage` is currently enabled. """ assert len( _CURRENT_STORAGE_STACK ), "get_event_storage() has to be called inside a 'with EventStorage(...)' context!" return _CURRENT_STORAGE_STACK[-1] def has_event_storage(): """ Returns: Check if there are EventStorage() context existed. """ return len(_CURRENT_STORAGE_STACK) > 0 class EventWriter: """ Base class for writers that obtain events from :class:`EventStorage` and process them. """ def write(self): raise NotImplementedError def close(self): pass class JSONWriter(EventWriter): """ Write scalars to a json file. It saves scalars as one json per line (instead of a big json) for easy parsing. Examples parsing such a json file: :: $ cat metrics.json | jq -s '.[0:2]' [ { "data_time": 0.008433341979980469, "iteration": 19, "loss": 1.9228371381759644, "loss_box_reg": 0.050025828182697296, "loss_classifier": 0.5316952466964722, "loss_mask": 0.7236229181289673, "loss_rpn_box": 0.0856662318110466, "loss_rpn_cls": 0.48198649287223816, "lr": 0.007173333333333333, "time": 0.25401854515075684 }, { "data_time": 0.007216215133666992, "iteration": 39, "loss": 1.282649278640747, "loss_box_reg": 0.06222952902317047, "loss_classifier": 0.30682939291000366, "loss_mask": 0.6970193982124329, "loss_rpn_box": 0.038663312792778015, "loss_rpn_cls": 0.1471673548221588, "lr": 0.007706666666666667, "time": 0.2490077018737793 } ] $ cat metrics.json | jq '.loss_mask' 0.7126231789588928 0.689423680305481 0.6776131987571716 ... """ def __init__(self, json_file, window_size=20): """ Args: json_file (str): path to the json file. New data will be appended if the file exists. window_size (int): the window size of median smoothing for the scalars whose `smoothing_hint` are True. """ self._file_handle = PathManager.open(json_file, "a") self._window_size = window_size self._last_write = -1 def write(self): storage = get_event_storage() to_save = defaultdict(dict) for k, (v, iter) in storage.latest_with_smoothing_hint(self._window_size).items(): # keep scalars that have not been written if iter <= self._last_write: continue to_save[iter][k] = v if len(to_save): all_iters = sorted(to_save.keys()) self._last_write = max(all_iters) for itr, scalars_per_iter in to_save.items(): scalars_per_iter["iteration"] = itr self._file_handle.write(json.dumps(scalars_per_iter, sort_keys=True) + "\n") self._file_handle.flush() try: os.fsync(self._file_handle.fileno()) except AttributeError: pass def close(self): self._file_handle.close() class TensorboardXWriter(EventWriter): """ Write all scalars to a tensorboard file. """ def __init__(self, log_dir: str, window_size: int = 20, **kwargs): """ Args: log_dir (str): the directory to save the output events window_size (int): the scalars will be median-smoothed by this window size kwargs: other arguments passed to `torch.utils.tensorboard.SummaryWriter(...)` """ self._window_size = window_size self._writer_args = {"log_dir": log_dir, **kwargs} self._last_write = -1 @cached_property def _writer(self): from torch.utils.tensorboard import SummaryWriter return SummaryWriter(**self._writer_args) def write(self): storage = get_event_storage() new_last_write = self._last_write for k, (v, iter) in storage.latest_with_smoothing_hint(self._window_size).items(): if iter > self._last_write: self._writer.add_scalar(k, v, iter) new_last_write = max(new_last_write, iter) self._last_write = new_last_write # storage.put_{image,histogram} is only meant to be used by # tensorboard writer. So we access its internal fields directly from here. if len(storage._vis_data) >= 1: for img_name, img, step_num in storage._vis_data: self._writer.add_image(img_name, img, step_num) # Storage stores all image data and rely on this writer to clear them. # As a result it assumes only one writer will use its image data. # An alternative design is to let storage store limited recent # data (e.g. only the most recent image) that all writers can access. # In that case a writer may not see all image data if its period is long. storage.clear_images() if len(storage._histograms) >= 1: for params in storage._histograms: self._writer.add_histogram_raw(**params) storage.clear_histograms() def close(self): if "_writer" in self.__dict__: self._writer.close() class CommonMetricPrinter(EventWriter): """ Print **common** metrics to the terminal, including iteration time, ETA, memory, all losses, and the learning rate. It also applies smoothing using a window of 20 elements. It's meant to print common metrics in common ways. To print something in more customized ways, please implement a similar printer by yourself. """ def __init__(self, max_iter: Optional[int] = None, window_size: int = 20): """ Args: max_iter: the maximum number of iterations to train. Used to compute ETA. If not given, ETA will not be printed. window_size (int): the losses will be median-smoothed by this window size """ self.logger = logging.getLogger("detectron2.utils.events") self._max_iter = max_iter self._window_size = window_size self._last_write = None # (step, time) of last call to write(). Used to compute ETA def _get_eta(self, storage) -> Optional[str]: if self._max_iter is None: return "" iteration = storage.iter try: eta_seconds = storage.history("time").median(1000) * (self._max_iter - iteration - 1) storage.put_scalar("eta_seconds", eta_seconds, smoothing_hint=False) return str(datetime.timedelta(seconds=int(eta_seconds))) except KeyError: # estimate eta on our own - more noisy eta_string = None if self._last_write is not None: estimate_iter_time = (time.perf_counter() - self._last_write[1]) / ( iteration - self._last_write[0] ) eta_seconds = estimate_iter_time * (self._max_iter - iteration - 1) eta_string = str(datetime.timedelta(seconds=int(eta_seconds))) self._last_write = (iteration, time.perf_counter()) return eta_string def write(self): storage = get_event_storage() iteration = storage.iter if iteration == self._max_iter: # This hook only reports training progress (loss, ETA, etc) but not other data, # therefore do not write anything after training succeeds, even if this method # is called. return try: avg_data_time = storage.history("data_time").avg( storage.count_samples("data_time", self._window_size) ) last_data_time = storage.history("data_time").latest() except KeyError: # they may not exist in the first few iterations (due to warmup) # or when SimpleTrainer is not used avg_data_time = None last_data_time = None try: avg_iter_time = storage.history("time").global_avg() last_iter_time = storage.history("time").latest() except KeyError: avg_iter_time = None last_iter_time = None try: lr = "{:.5g}".format(storage.history("lr").latest()) except KeyError: lr = "N/A" eta_string = self._get_eta(storage) if torch.cuda.is_available(): max_mem_mb = torch.cuda.max_memory_allocated() / 1024.0 / 1024.0 else: max_mem_mb = None # NOTE: max_mem is parsed by grep in "dev/parse_results.sh" self.logger.info( str.format( " {eta}iter: {iter} {losses} {non_losses} {avg_time}{last_time}" + "{avg_data_time}{last_data_time} lr: {lr} {memory}", eta=f"eta: {eta_string} " if eta_string else "", iter=iteration, losses=" ".join( [ "{}: {:.4g}".format( k, v.median(storage.count_samples(k, self._window_size)) ) for k, v in storage.histories().items() if "loss" in k ] ), non_losses=" ".join( [ "{}: {:.4g}".format( k, v.median(storage.count_samples(k, self._window_size)) ) for k, v in storage.histories().items() if "[metric]" in k ] ), avg_time="time: {:.4f} ".format(avg_iter_time) if avg_iter_time is not None else "", last_time="last_time: {:.4f} ".format(last_iter_time) if last_iter_time is not None else "", avg_data_time="data_time: {:.4f} ".format(avg_data_time) if avg_data_time is not None else "", last_data_time="last_data_time: {:.4f} ".format(last_data_time) if last_data_time is not None else "", lr=lr, memory="max_mem: {:.0f}M".format(max_mem_mb) if max_mem_mb is not None else "", ) ) class EventStorage: """ The user-facing class that provides metric storage functionalities. In the future we may add support for storing / logging other types of data if needed. """ def __init__(self, start_iter=0): """ Args: start_iter (int): the iteration number to start with """ self._history = defaultdict(HistoryBuffer) self._smoothing_hints = {} self._latest_scalars = {} self._iter = start_iter self._current_prefix = "" self._vis_data = [] self._histograms = [] def put_image(self, img_name, img_tensor): """ Add an `img_tensor` associated with `img_name`, to be shown on tensorboard. Args: img_name (str): The name of the image to put into tensorboard. img_tensor (torch.Tensor or numpy.array): An `uint8` or `float` Tensor of shape `[channel, height, width]` where `channel` is 3. The image format should be RGB. The elements in img_tensor can either have values in [0, 1] (float32) or [0, 255] (uint8). The `img_tensor` will be visualized in tensorboard. """ self._vis_data.append((img_name, img_tensor, self._iter)) def put_scalar(self, name, value, smoothing_hint=True, cur_iter=None): """ Add a scalar `value` to the `HistoryBuffer` associated with `name`. Args: smoothing_hint (bool): a 'hint' on whether this scalar is noisy and should be smoothed when logged. The hint will be accessible through :meth:`EventStorage.smoothing_hints`. A writer may ignore the hint and apply custom smoothing rule. It defaults to True because most scalars we save need to be smoothed to provide any useful signal. cur_iter (int): an iteration number to set explicitly instead of current iteration """ name = self._current_prefix + name cur_iter = self._iter if cur_iter is None else cur_iter history = self._history[name] value = float(value) history.update(value, cur_iter) self._latest_scalars[name] = (value, cur_iter) existing_hint = self._smoothing_hints.get(name) if existing_hint is not None: assert ( existing_hint == smoothing_hint ), "Scalar {} was put with a different smoothing_hint!".format(name) else: self._smoothing_hints[name] = smoothing_hint def put_scalars(self, *, smoothing_hint=True, cur_iter=None, **kwargs): """ Put multiple scalars from keyword arguments. Examples: storage.put_scalars(loss=my_loss, accuracy=my_accuracy, smoothing_hint=True) """ for k, v in kwargs.items(): self.put_scalar(k, v, smoothing_hint=smoothing_hint, cur_iter=cur_iter) def put_histogram(self, hist_name, hist_tensor, bins=1000): """ Create a histogram from a tensor. Args: hist_name (str): The name of the histogram to put into tensorboard. hist_tensor (torch.Tensor): A Tensor of arbitrary shape to be converted into a histogram. bins (int): Number of histogram bins. """ ht_min, ht_max = hist_tensor.min().item(), hist_tensor.max().item() # Create a histogram with PyTorch hist_counts = torch.histc(hist_tensor, bins=bins) hist_edges = torch.linspace(start=ht_min, end=ht_max, steps=bins + 1, dtype=torch.float32) # Parameter for the add_histogram_raw function of SummaryWriter hist_params = dict( tag=hist_name, min=ht_min, max=ht_max, num=len(hist_tensor), sum=float(hist_tensor.sum()), sum_squares=float(torch.sum(hist_tensor**2)), bucket_limits=hist_edges[1:].tolist(), bucket_counts=hist_counts.tolist(), global_step=self._iter, ) self._histograms.append(hist_params) def history(self, name): """ Returns: HistoryBuffer: the scalar history for name """ ret = self._history.get(name, None) if ret is None: raise KeyError("No history metric available for {}!".format(name)) return ret def histories(self): """ Returns: dict[name -> HistoryBuffer]: the HistoryBuffer for all scalars """ return self._history def latest(self): """ Returns: dict[str -> (float, int)]: mapping from the name of each scalar to the most recent value and the iteration number its added. """ return self._latest_scalars def latest_with_smoothing_hint(self, window_size=20): """ Similar to :meth:`latest`, but the returned values are either the un-smoothed original latest value, or a median of the given window_size, depend on whether the smoothing_hint is True. This provides a default behavior that other writers can use. Note: All scalars saved in the past `window_size` iterations are used for smoothing. This is different from the `window_size` definition in HistoryBuffer. Use :meth:`get_history_window_size` to get the `window_size` used in HistoryBuffer. """ result = {} for k, (v, itr) in self._latest_scalars.items(): result[k] = ( self._history[k].median(self.count_samples(k, window_size)) if self._smoothing_hints[k] else v, itr, ) return result def count_samples(self, name, window_size=20): """ Return the number of samples logged in the past `window_size` iterations. """ samples = 0 data = self._history[name].values() for _, iter_ in reversed(data): if iter_ > data[-1][1] - window_size: samples += 1 else: break return samples def smoothing_hints(self): """ Returns: dict[name -> bool]: the user-provided hint on whether the scalar is noisy and needs smoothing. """ return self._smoothing_hints def step(self): """ User should either: (1) Call this function to increment storage.iter when needed. Or (2) Set `storage.iter` to the correct iteration number before each iteration. The storage will then be able to associate the new data with an iteration number. """ self._iter += 1 @property def iter(self): """ Returns: int: The current iteration number. When used together with a trainer, this is ensured to be the same as trainer.iter. """ return self._iter @iter.setter def iter(self, val): self._iter = int(val) @property def iteration(self): # for backward compatibility return self._iter def __enter__(self): _CURRENT_STORAGE_STACK.append(self) return self def __exit__(self, exc_type, exc_val, exc_tb): assert _CURRENT_STORAGE_STACK[-1] == self _CURRENT_STORAGE_STACK.pop() @contextmanager def name_scope(self, name): """ Yields: A context within which all the events added to this storage will be prefixed by the name scope. """ old_prefix = self._current_prefix self._current_prefix = name.rstrip("/") + "/" yield self._current_prefix = old_prefix def clear_images(self): """ Delete all the stored images for visualization. This should be called after images are written to tensorboard. """ self._vis_data = [] def clear_histograms(self): """ Delete all the stored histograms for visualization. This should be called after histograms are written to tensorboard. """ self._histograms = []