Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import os.path as osp | |
| import scipy.sparse as sp | |
| import shutil | |
| from pathlib import Path | |
| import torch | |
| import numpy as np | |
| import re | |
| import os | |
| import requests | |
| def ensure_pretrained_file(hf_url: str, save_dir: str = "pretrained", filename: str = "pretrained.pkl", token: str = None): | |
| """ | |
| Ensure that a pretrained file exists locally. | |
| If the folder is empty, download from Hugging Face. | |
| Args: | |
| hf_url (str): Hugging Face file URL (resolve/main/...). | |
| save_dir (str): Directory to store pretrained file. | |
| filename (str): Name of file to save. | |
| token (str): Optional Hugging Face token (for gated/private repos). | |
| """ | |
| os.makedirs(save_dir, exist_ok=True) | |
| save_path = os.path.join(save_dir, filename) | |
| if os.path.exists(save_path): | |
| print(f"β Found pretrained file: {save_path}") | |
| return save_path | |
| headers = {"Authorization": f"Bearer {token}"} if token else {} | |
| print(f"β¬οΈ Downloading pretrained file from {hf_url} to {save_path} ...") | |
| response = requests.get(hf_url, headers=headers, stream=True) | |
| response.raise_for_status() | |
| with open(save_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print("β Download complete.") | |
| return save_path | |
| def may_create_folder(folder_path): | |
| if not osp.exists(folder_path): | |
| oldmask = os.umask(000) | |
| os.makedirs(folder_path, mode=0o777) | |
| os.umask(oldmask) | |
| return True | |
| return False | |
| def make_clean_folder(folder_path): | |
| success = may_create_folder(folder_path) | |
| if not success: | |
| shutil.rmtree(folder_path) | |
| may_create_folder(folder_path) | |
| def str_delta(delta): | |
| s = delta.total_seconds() | |
| hours, remainder = divmod(s, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| return '{:02}:{:02}:{:02}'.format(int(hours), int(minutes), int(seconds)) | |
| def desc_from_config(config, world_size): | |
| cond_str = 'uncond' #'cond' if c.dataset_kwargs.use_labels else 'uncond' | |
| dtype_str = 'fp16' if config["perfs"]["fp16"] else 'fp32' | |
| name_data = config["data"]["name"] | |
| if "abs" in config["data"]: | |
| name_data += "abs" if config["data"]["abs"] else "" | |
| desc = (f'{name_data:s}-{cond_str:s}-{config["architecture"]["model"]:s}-' | |
| f'gpus{world_size:d}-batch{config["hyper_params"]["batch_size"]:d}-{dtype_str:s}') | |
| if config["misc"]["precond"]: | |
| desc += f'-{config["misc"]["precond"]:s}' | |
| if "desc" in config["misc"]: | |
| if config["misc"]["desc"] is not None: | |
| desc += f'-{config["misc"]["desc"]}' | |
| return desc | |
| def get_dataset_path(config): | |
| name_exp = config["data"]["name"] | |
| if "add_name" in config: | |
| if config["add_name"]["do"]: | |
| name_exp += "_" + config["add_name"]["name"] | |
| dataset_path = os.path.join(config["data"]["root_dir"], name_exp) | |
| return name_exp, dataset_path | |
| def sparse_np_to_torch(A): | |
| Acoo = A.tocoo() | |
| values = Acoo.data | |
| indices = np.vstack((Acoo.row, Acoo.col)) | |
| shape = Acoo.shape | |
| return torch.sparse_coo_tensor(torch.LongTensor(indices), torch.FloatTensor(values), torch.Size(shape), dtype=torch.float32).coalesce() | |
| def convert_dict(np_dict, device="cpu"): | |
| torch_dict = {} | |
| for k, value in np_dict.items(): | |
| if sp.issparse(value): | |
| torch_dict[k] = sparse_np_to_torch(value).to(device) | |
| if torch_dict[k].dtype == torch.int32: | |
| torch_dict[k] = torch_dict[k].long().to(device) | |
| elif isinstance(value, np.ndarray): | |
| torch_dict[k] = torch.from_numpy(value).to(device) | |
| if torch_dict[k].dtype == torch.int32: | |
| torch_dict[k] = torch_dict[k].squeeze().long().to(device) | |
| else: | |
| torch_dict[k] = value | |
| return torch_dict | |
| def convert_dict_torch(in_dict, device="cpu"): | |
| torch_dict = {} | |
| for k, value in in_dict.items(): | |
| if isinstance(value, torch.Tensor): | |
| torch_dict[k] = in_dict[k].to(device) | |
| return torch_dict | |
| def batchify_dict(torch_dict): | |
| for k, value in torch_dict.items(): | |
| if isinstance(value, torch.Tensor): | |
| if torch_dict[k].dtype != torch.int64: | |
| torch_dict[k] = torch_dict[k].unsqueeze(0) | |