Spaces:
Sleeping
Sleeping
File size: 6,324 Bytes
640ea1d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# Google utils: https://cloud.google.com/storage/docs/reference/libraries
import os
import platform
import subprocess
import time
from pathlib import Path
import requests
import torch
def attempt_download_from_hub(repo_id, hf_token=None):
# https://github.com/fcakyon/yolov5-pip/blob/main/yolov5/utils/downloads.py
from huggingface_hub import hf_hub_download, list_repo_files
from huggingface_hub.utils._errors import RepositoryNotFoundError
from huggingface_hub.utils._validators import HFValidationError
try:
repo_files = list_repo_files(repo_id=repo_id, repo_type='model', token=hf_token)
model_file = [f for f in repo_files if f.endswith('.pt')][0]
file = hf_hub_download(
repo_id=repo_id,
filename=model_file,
repo_type='model',
token=hf_token,
)
return file
except (RepositoryNotFoundError, HFValidationError):
return None
def gsutil_getsize(url=''):
# gs://bucket/file size https://cloud.google.com/storage/docs/gsutil/commands/du
s = subprocess.check_output(f'gsutil du {url}', shell=True).decode('utf-8')
return eval(s.split(' ')[0]) if len(s) else 0 # bytes
""" The attempt_download function has been fixed by @kadirnar.
tag = subprocess.check_output('git tag', shell=True).decode().split()[-1]
IndexError: list index out of range
"""
def attempt_download(file, repo="WongKinYiu/yolov7"):
# Attempt file download if does not exist
file = Path(str(file).strip().replace("'", "").lower())
if not file.exists():
# Set default assets
assets = [
"yolov7.pt",
"yolov7-tiny.pt",
"yolov7x.pt",
"yolov7-d6.pt",
"yolov7-e6.pt",
"yolov7-e6e.pt",
"yolov7-w6.pt",
]
try:
response = requests.get(f"https://api.github.com/repos/{repo}/releases").json() # github api
if len(response) > 0:
release_assets = response[0] # get dictionary of assets
# Get names of assets if it rleases exists
assets = [release_asset["name"] for release_asset in release_assets["assets"]]
# Get first tag which is the latest tag
tag = release_assets.get("tag_name")
except KeyError: # fallback plan
tag = subprocess.check_output("git tag", shell=True).decode().split()[-1]
except subprocess.CalledProcessError: # fallback to default release if can't get tag
tag = "v0.1"
name = file.name
if name in assets:
msg = f"{file} missing, try downloading from https://github.com/{repo}/releases/"
redundant = False # second download option
try: # GitHub
url = f"https://github.com/{repo}/releases/download/{tag}/{name}"
print(f"Downloading {url} to {file}...")
torch.hub.download_url_to_file(url, file)
assert file.exists() and file.stat().st_size > 1e6 # check
except Exception as e: # GCP
print(f"Download error: {e}")
assert redundant, "No secondary mirror"
url = f"https://storage.googleapis.com/{repo}/ckpt/{name}"
print(f"Downloading {url} to {file}...")
# torch.hub.download_url_to_file(url, weights)
os.system(f"curl -L {url} -o {file}")
finally:
if not file.exists() or file.stat().st_size < 1e6: # check
file.unlink(missing_ok=True) # remove partial downloads
print(f"ERROR: Download failure: {msg}")
print("")
return file
return str(file) # return file
def gdrive_download(id='', file='tmp.zip'):
# Downloads a file from Google Drive. from yolov7.utils.google_utils import *; gdrive_download()
t = time.time()
file = Path(file)
cookie = Path('cookie') # gdrive cookie
print(f'Downloading https://drive.google.com/uc?export=download&id={id} as {file}... ', end='')
file.unlink(missing_ok=True) # remove existing file
cookie.unlink(missing_ok=True) # remove existing cookie
# Attempt file download
out = "NUL" if platform.system() == "Windows" else "/dev/null"
os.system(f'curl -c ./cookie -s -L "drive.google.com/uc?export=download&id={id}" > {out}')
if os.path.exists('cookie'): # large file
s = f'curl -Lb ./cookie "drive.google.com/uc?export=download&confirm={get_token()}&id={id}" -o {file}'
else: # small file
s = f'curl -s -L -o {file} "drive.google.com/uc?export=download&id={id}"'
r = os.system(s) # execute, capture return
cookie.unlink(missing_ok=True) # remove existing cookie
# Error check
if r != 0:
file.unlink(missing_ok=True) # remove partial
print('Download error ') # raise Exception('Download error')
return r
# Unzip if archive
if file.suffix == '.zip':
print('unzipping... ', end='')
os.system(f'unzip -q {file}') # unzip
file.unlink() # remove zip to free space
print(f'Done ({time.time() - t:.1f}s)')
return r
def get_token(cookie="./cookie"):
with open(cookie) as f:
for line in f:
if "download" in line:
return line.split()[-1]
return ""
# def upload_blob(bucket_name, source_file_name, destination_blob_name):
# # Uploads a file to a bucket
# # https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
#
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(destination_blob_name)
#
# blob.upload_from_filename(source_file_name)
#
# print('File {} uploaded to {}.'.format(
# source_file_name,
# destination_blob_name))
#
#
# def download_blob(bucket_name, source_blob_name, destination_file_name):
# # Uploads a blob from a bucket
# storage_client = storage.Client()
# bucket = storage_client.get_bucket(bucket_name)
# blob = bucket.blob(source_blob_name)
#
# blob.download_to_filename(destination_file_name)
#
# print('Blob {} downloaded to {}.'.format(
# source_blob_name,
# destination_file_name))
|