import os import sys import warnings from importlib.util import find_spec from pathlib import Path from typing import Any, Callable, Dict, Tuple import gdown import matplotlib.pyplot as plt import numpy as np import torch import wget from omegaconf import DictConfig from pflow.utils import pylogger, rich_utils log = pylogger.get_pylogger(__name__) def extras(cfg: DictConfig) -> None: """Applies optional utilities before the task is started. Utilities: - Ignoring python warnings - Setting tags from command line - Rich config printing :param cfg: A DictConfig object containing the config tree. """ # return if no `extras` config if not cfg.get("extras"): log.warning("Extras config not found! ") return # disable python warnings if cfg.extras.get("ignore_warnings"): log.info("Disabling python warnings! ") warnings.filterwarnings("ignore") # prompt user to input tags from command line if none are provided in the config if cfg.extras.get("enforce_tags"): log.info("Enforcing tags! ") rich_utils.enforce_tags(cfg, save_to_file=True) # pretty print config tree using Rich library if cfg.extras.get("print_config"): log.info("Printing config tree with Rich! ") rich_utils.print_config_tree(cfg, resolve=True, save_to_file=True) def task_wrapper(task_func: Callable) -> Callable: """Optional decorator that controls the failure behavior when executing the task function. This wrapper can be used to: - make sure loggers are closed even if the task function raises an exception (prevents multirun failure) - save the exception to a `.log` file - mark the run as failed with a dedicated file in the `logs/` folder (so we can find and rerun it later) - etc. (adjust depending on your needs) Example: ``` @utils.task_wrapper def train(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: ... return metric_dict, object_dict ``` :param task_func: The task function to be wrapped. :return: The wrapped task function. """ def wrap(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: # execute the task try: metric_dict, object_dict = task_func(cfg=cfg) # things to do if exception occurs except Exception as ex: # save exception to `.log` file log.exception("") # some hyperparameter combinations might be invalid or cause out-of-memory errors # so when using hparam search plugins like Optuna, you might want to disable # raising the below exception to avoid multirun failure raise ex # things to always do after either success or exception finally: # display output dir path in terminal log.info(f"Output dir: {cfg.paths.output_dir}") # always close wandb run (even if exception occurs so multirun won't fail) if find_spec("wandb"): # check if wandb is installed import wandb if wandb.run: log.info("Closing wandb!") wandb.finish() return metric_dict, object_dict return wrap def get_metric_value(metric_dict: Dict[str, Any], metric_name: str) -> float: """Safely retrieves value of the metric logged in LightningModule. :param metric_dict: A dict containing metric values. :param metric_name: The name of the metric to retrieve. :return: The value of the metric. """ if not metric_name: log.info("Metric name is None! Skipping metric value retrieval...") return None if metric_name not in metric_dict: raise Exception( f"Metric value not found! \n" "Make sure metric name logged in LightningModule is correct!\n" "Make sure `optimized_metric` name in `hparams_search` config is correct!" ) metric_value = metric_dict[metric_name].item() log.info(f"Retrieved metric value! <{metric_name}={metric_value}>") return metric_value def intersperse(lst, item): # Adds blank symbol result = [item] * (len(lst) * 2 + 1) result[1::2] = lst return result def save_figure_to_numpy(fig): data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep="") data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,)) return data def plot_tensor(tensor): plt.style.use("default") fig, ax = plt.subplots(figsize=(12, 3)) im = ax.imshow(tensor, aspect="auto", origin="lower", interpolation="none") plt.colorbar(im, ax=ax) plt.tight_layout() fig.canvas.draw() data = save_figure_to_numpy(fig) plt.close() return data def save_plot(tensor, savepath): plt.style.use("default") fig, ax = plt.subplots(figsize=(12, 3)) im = ax.imshow(tensor, aspect="auto", origin="lower", interpolation="none") plt.colorbar(im, ax=ax) plt.tight_layout() fig.canvas.draw() plt.savefig(savepath) plt.close() def to_numpy(tensor): if isinstance(tensor, np.ndarray): return tensor elif isinstance(tensor, torch.Tensor): return tensor.detach().cpu().numpy() elif isinstance(tensor, list): return np.array(tensor) else: raise TypeError("Unsupported type for conversion to numpy array") def get_user_data_dir(appname="pflow_tts"): """ Args: appname (str): Name of application Returns: Path: path to user data directory """ PFLOW_HOME = os.environ.get("PFLOW_HOME") if PFLOW_HOME is not None: ans = Path(PFLOW_HOME).expanduser().resolve(strict=False) elif sys.platform == "win32": import winreg # pylint: disable=import-outside-toplevel key = winreg.OpenKey( winreg.HKEY_CURRENT_USER, r"Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders", ) dir_, _ = winreg.QueryValueEx(key, "Local AppData") ans = Path(dir_).resolve(strict=False) elif sys.platform == "darwin": ans = Path("~/Library/Application Support/").expanduser() else: ans = Path.home().joinpath(".local/share") final_path = ans.joinpath(appname) final_path.mkdir(parents=True, exist_ok=True) return final_path def assert_model_downloaded(checkpoint_path, url, use_wget=False): if Path(checkpoint_path).exists(): log.debug(f"[+] Model already present at {checkpoint_path}!") return log.info(f"[-] Model not found at {checkpoint_path}! Will download it") checkpoint_path = str(checkpoint_path) if not use_wget: gdown.download(url=url, output=checkpoint_path, quiet=False, fuzzy=True) else: wget.download(url=url, out=checkpoint_path)