# Google utils: https://cloud.google.com/storage/docs/reference/libraries import os import platform import subprocess import time from pathlib import Path import requests import torch def attempt_download_from_hub(repo_id, hf_token=None): # https://github.com/fcakyon/yolov5-pip/blob/main/yolov5/utils/downloads.py from huggingface_hub import hf_hub_download, list_repo_files from huggingface_hub.utils._errors import RepositoryNotFoundError from huggingface_hub.utils._validators import HFValidationError try: repo_files = list_repo_files(repo_id=repo_id, repo_type='model', token=hf_token) model_file = [f for f in repo_files if f.endswith('.pt')][0] file = hf_hub_download( repo_id=repo_id, filename=model_file, repo_type='model', token=hf_token, ) return file except (RepositoryNotFoundError, HFValidationError): return None def gsutil_getsize(url=''): # gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8') return eval(s.split(' ')[0]) if len(s) else 0 # bytes """ The attempt_download function has been fixed by @kadirnar. tag = subprocess.check_output('git tag', shell=True).decode().split()[-1] IndexError: list index out of range """ def attempt_download(file, repo="WongKinYiu/yolov7"): # Attempt file download if does not exist file = Path(str(file).strip().replace("'", "").lower()) if not file.exists(): # Set default assets assets = [ "yolov7.pt", "yolov7-tiny.pt", "yolov7x.pt", "yolov7-d6.pt", "yolov7-e6.pt", "yolov7-e6e.pt", "yolov7-w6.pt", ] try: response = requests.get(f"https://api.github.com/repos/{repo}/releases").json() # github api if len(response) > 0: release_assets = response[0] # get dictionary of assets # Get names of assets if it rleases exists assets = [release_asset["name"] for release_asset in release_assets["assets"]] # Get first tag which is the latest tag tag = release_assets.get("tag_name") except KeyError: # fallback plan tag = subprocess.check_output("git tag", shell=True).decode().split()[-1] except subprocess.CalledProcessError: # fallback to default release if can't get tag tag = "v0.1" name = file.name if name in assets: msg = f"{file} missing, try downloading from https://github.com/{repo}/releases/" redundant = False # second download option try: # GitHub url = f"https://github.com/{repo}/releases/download/{tag}/{name}" print(f"Downloading {url} to {file}...") torch.hub.download_url_to_file(url, file) assert file.exists() and file.stat().st_size > 1e6 # check except Exception as e: # GCP print(f"Download error: {e}") assert redundant, "No secondary mirror" url = f"https://storage.googleapis.com/{repo}/ckpt/{name}" print(f"Downloading {url} to {file}...") # torch.hub.download_url_to_file(url, weights) os.system(f"curl -L {url} -o {file}") finally: if not file.exists() or file.stat().st_size < 1e6: # check file.unlink(missing_ok=True) # remove partial downloads print(f"ERROR: Download failure: {msg}") print("") return file return str(file) # return file def gdrive_download(id='', file='tmp.zip'): # Downloads a file from Google Drive. from yolov7.utils.google_utils import *; gdrive_download() t = time.time() file = Path(file) cookie = Path('cookie') # gdrive cookie print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='') file.unlink(missing_ok=True) # remove existing file cookie.unlink(missing_ok=True) # remove existing cookie # Attempt file download out = "NUL" if platform.system() == "Windows" else "/dev/null" os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}') if os.path.exists('cookie'): # large file s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}' else: # small file s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"' r = os.system(s) # execute, capture return cookie.unlink(missing_ok=True) # remove existing cookie # Error check if r != 0: file.unlink(missing_ok=True) # remove partial print('Download error ') # raise Exception('Download error') return r # Unzip if archive if file.suffix == '.zip': print('unzipping... ', end='') os.system(f'unzip -q {file}') # unzip file.unlink() # remove zip to free space print(f'Done ({time.time() - t:.1f}s)') return r def get_token(cookie="./cookie"): with open(cookie) as f: for line in f: if "download" in line: return line.split()[-1] return "" # def upload_blob(bucket_name, source_file_name, destination_blob_name): # # Uploads a file to a bucket # # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python # # storage_client = storage.Client() # bucket = storage_client.get_bucket(bucket_name) # blob = bucket.blob(destination_blob_name) # # blob.upload_from_filename(source_file_name) # # print('File {} uploaded to {}.'.format( # source_file_name, # destination_blob_name)) # # # def download_blob(bucket_name, source_blob_name, destination_file_name): # # Uploads a blob from a bucket # storage_client = storage.Client() # bucket = storage_client.get_bucket(bucket_name) # blob = bucket.blob(source_blob_name) # # blob.download_to_filename(destination_file_name) # # print('Blob {} downloaded to {}.'.format( # source_blob_name, # destination_file_name))