secondtest / rvc /lib /tools /model_download.py
Aitron Emper
Upload model_download.py
9dea1d3 verified
raw
history blame
9.64 kB
import os
import sys
import wget
import zipfile
from bs4 import BeautifulSoup
import requests
from urllib.parse import unquote
import re
import shutil
def find_folder_parent(search_dir, folder_name):
for dirpath, dirnames, _ in os.walk(search_dir):
if folder_name in dirnames:
return os.path.abspath(dirpath)
return None
now_dir = os.getcwd()
sys.path.append(now_dir)
from rvc.lib.utils import format_title
import rvc.lib.tools.gdown as gdown
file_path = find_folder_parent(now_dir, "logs")
zips_path = os.getcwd() + "/logs/zips"
def search_pth_index(folder):
pth_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".pth")
]
index_paths = [
os.path.join(folder, file)
for file in os.listdir(folder)
if os.path.isfile(os.path.join(folder, file)) and file.endswith(".index")
]
return pth_paths, index_paths
def get_mediafire_download_link(url):
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
download_button = soup.find(
"a", {"class": "input popsok", "aria-label": "Download file"}
)
if download_button:
download_link = download_button.get("href")
return download_link
else:
return None
def download_from_url(url):
os.makedirs(zips_path, exist_ok=True)
if url != "":
if "drive.google.com" in url:
if "file/d/" in url:
file_id = url.split("file/d/")[1].split("/")[0]
elif "id=" in url:
file_id = url.split("id=")[1].split("&")[0]
else:
return None
if file_id:
os.chdir(zips_path)
try:
gdown.download(
f"https://drive.google.com/uc?id={file_id}",
quiet=False,
fuzzy=True,
)
except Exception as error:
error_message = str(error)
if (
"Too many users have viewed or downloaded this file recently"
in error_message
):
os.chdir(now_dir)
return "too much use"
elif (
"Cannot retrieve the public link of the file." in error_message
):
os.chdir(now_dir)
return "private link"
else:
print(error_message)
os.chdir(now_dir)
return None
elif "/blob/" in url or "/resolve/" in url:
os.chdir(zips_path)
if "/blob/" in url:
url = url.replace("/blob/", "/resolve/")
response = requests.get(url, stream=True)
if response.status_code == 200:
file_name = url.split("/")[-1]
file_name = unquote(file_name)
file_name = re.sub(r"[^a-zA-Z0-9_.-]", "_", file_name)
total_size_in_bytes = int(response.headers.get("content-length", 0))
block_size = 1024
progress_bar_length = 50
progress = 0
with open(os.path.join(zips_path, file_name), "wb") as file:
for data in response.iter_content(block_size):
file.write(data)
progress += len(data)
progress_percent = int((progress / total_size_in_bytes) * 100)
num_dots = int(
(progress / total_size_in_bytes) * progress_bar_length
)
progress_bar = (
"["
+ "." * num_dots
+ " " * (progress_bar_length - num_dots)
+ "]"
)
print(
f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ",
end="\r",
)
if progress_percent == 100:
print("\n")
else:
os.chdir(now_dir)
return None
elif "/tree/main" in url:
os.chdir(zips_path)
response = requests.get(url)
soup = BeautifulSoup(response.content, "html.parser")
temp_url = ""
for link in soup.find_all("a", href=True):
if link["href"].endswith(".zip"):
temp_url = link["href"]
break
if temp_url:
url = temp_url
url = url.replace("blob", "resolve")
if "huggingface.co" not in url:
url = "https://huggingface.co" + url
wget.download(url)
else:
os.chdir(now_dir)
return None
else:
try:
os.chdir(zips_path)
wget.download(url)
except Exception as error:
os.chdir(now_dir)
print(error)
return None
for currentPath, _, zipFiles in os.walk(zips_path):
for Files in zipFiles:
filePart = Files.split(".")
extensionFile = filePart[len(filePart) - 1]
filePart.pop()
nameFile = "_".join(filePart)
realPath = os.path.join(currentPath, Files)
os.rename(realPath, nameFile + "." + extensionFile)
os.chdir(now_dir)
return "downloaded"
os.chdir(now_dir)
return None
def extract_and_show_progress(zipfile_path, unzips_path):
try:
with zipfile.ZipFile(zipfile_path, "r") as zip_ref:
for file_info in zip_ref.infolist():
zip_ref.extract(file_info, unzips_path)
os.remove(zipfile_path)
return True
except Exception as error:
print(error)
return False
def unzip_file(zip_path, zip_file_name):
zip_file_path = os.path.join(zip_path, zip_file_name + ".zip")
extract_path = os.path.join(file_path, zip_file_name)
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
os.remove(zip_file_path)
url = sys.argv[1]
if "?download=true" in url:
url = url.replace("?download=true", "")
verify = download_from_url(url)
if verify == "downloaded":
extract_folder_path = ""
for filename in os.listdir(zips_path):
if filename.endswith(".zip"):
zipfile_path = os.path.join(zips_path, filename)
print("Proceeding with the extraction...")
model_zip = os.path.basename(zipfile_path)
model_name = format_title(model_zip.split(".zip")[0])
extract_folder_path = os.path.join(
"logs",
os.path.normpath(model_name),
)
success = extract_and_show_progress(zipfile_path, extract_folder_path)
subfolders = [f for f in os.listdir(extract_folder_path) if os.path.isdir(os.path.join(extract_folder_path, f))]
if len(subfolders) == 1:
subfolder_path = os.path.join(extract_folder_path, subfolders[0])
for item in os.listdir(subfolder_path):
s = os.path.join(subfolder_path, item)
d = os.path.join(extract_folder_path, item)
shutil.move(s, d)
os.rmdir(subfolder_path)
for item in os.listdir(extract_folder_path):
if ".pth" in item:
file_name = item.split(".pth")[0]
if file_name != model_name:
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(extract_folder_path, model_name + ".pth"),
)
else:
if "v2" not in item:
file_name = item.split("_nprobe_1_")[1].split("_v1")[0]
if file_name != model_name:
new_file_name = item.split("_nprobe_1_")[0] + "_nprobe_1_" + model_name + "_v1"
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(extract_folder_path, new_file_name + ".index"),
)
else:
file_name = item.split("_nprobe_1_")[1].split("_v2")[0]
if file_name != model_name:
new_file_name = item.split("_nprobe_1_")[0] + "_nprobe_1_" + model_name + "_v2"
os.rename(
os.path.join(extract_folder_path, item),
os.path.join(extract_folder_path, new_file_name + ".index"),
)
if success:
print(f"Model {model_name} downloaded!")
else:
print(f"Error downloading {model_name}")
sys.exit()
if extract_folder_path == "":
print("Zip file was not found.")
sys.exit()
result = search_pth_index(extract_folder_path)
else:
message = "Error"
sys.exit()