Spaces:
Runtime error
Runtime error
from flask import Flask, request, Response | |
import os | |
import sys | |
import requests | |
import shutil | |
import subprocess | |
import wget | |
import signal | |
from bs4 import BeautifulSoup | |
import logging | |
import click | |
app = Flask(__name__) | |
# Disable flask starting message | |
log = logging.getLogger('werkzeug') | |
log.setLevel(logging.ERROR) | |
def secho(text, file=None, nl=None, err=None, color=None, **styles): | |
pass | |
def echo(text, file=None, nl=None, err=None, color=None, **styles): | |
pass | |
click.echo = echo | |
click.secho = secho | |
# Get the current directory path | |
now_dir = os.path.dirname(os.path.abspath(__file__)) | |
# Go up two levels in the directory hierarchy | |
for _ in range(2): | |
now_dir = os.path.dirname(now_dir) | |
# Add now_dir to sys.path so Python can find modules in that location | |
sys.path.append(now_dir) | |
from assets.i18n.i18n import I18nAuto | |
i18n = I18nAuto() | |
# Use the code from the resources module but with some changes | |
def find_folder_parent(search_dir, folder_name): | |
for dirpath, dirnames, filenames in os.walk(search_dir): | |
if folder_name in dirnames: | |
return os.path.abspath(dirpath) | |
return None | |
def get_mediafire_download_link(url): | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
download_button = soup.find('a', {'class': 'input popsok', 'aria-label': 'Download file'}) | |
if download_button: | |
download_link = download_button.get('href') | |
return download_link | |
else: | |
return None | |
def download_from_url(url): | |
file_path = find_folder_parent(now_dir, "assets") | |
print(file_path) | |
zips_path = os.path.join(file_path, "assets", "zips") | |
print(zips_path) | |
os.makedirs(zips_path, exist_ok=True) | |
if url != "": | |
print(i18n("Downloading the file: ") + f"{url}") | |
if "drive.google.com" in url: | |
if "file/d/" in url: | |
file_id = url.split("file/d/")[1].split("/")[0] | |
elif "id=" in url: | |
file_id = url.split("id=")[1].split("&")[0] | |
else: | |
return None | |
if file_id: | |
os.chdir(zips_path) | |
result = subprocess.run( | |
["gdown", f"https://drive.google.com/uc?id={file_id}", "--fuzzy"], | |
capture_output=True, | |
text=True, | |
encoding="utf-8", | |
) | |
if ( | |
"Too many users have viewed or downloaded this file recently" | |
in str(result.stderr) | |
): | |
return "too much use" | |
if "Cannot retrieve the public link of the file." in str(result.stderr): | |
return "private link" | |
print(result.stderr) | |
elif "/blob/" in url or "/resolve/" in url: | |
os.chdir(zips_path) | |
if "/blob/" in url: | |
url = url.replace("/blob/", "/resolve/") | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
file_name = url.split("/")[-1] | |
file_name = file_name.replace("%20", "_") | |
total_size_in_bytes = int(response.headers.get('content-length', 0)) | |
block_size = 1024 # 1 Kibibyte | |
progress_bar_length = 50 | |
progress = 0 | |
with open(os.path.join(zips_path, file_name), 'wb') as file: | |
for data in response.iter_content(block_size): | |
file.write(data) | |
progress += len(data) | |
progress_percent = int((progress / total_size_in_bytes) * 100) | |
num_dots = int((progress / total_size_in_bytes) * progress_bar_length) | |
progress_bar = "[" + "." * num_dots + " " * (progress_bar_length - num_dots) + "]" | |
print(f"{progress_percent}% {progress_bar} {progress}/{total_size_in_bytes} ", end="\r") | |
if progress_percent == 100: | |
print("\n") | |
else: | |
os.chdir(file_path) | |
return None | |
elif "mega.nz" in url: | |
if "#!" in url: | |
file_id = url.split("#!")[1].split("!")[0] | |
elif "file/" in url: | |
file_id = url.split("file/")[1].split("/")[0] | |
else: | |
return None | |
if file_id: | |
print("Mega.nz is unsupported due mega.py deprecation") | |
elif "/tree/main" in url: | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "html.parser") | |
temp_url = "" | |
for link in soup.find_all("a", href=True): | |
if link["href"].endswith(".zip"): | |
temp_url = link["href"] | |
break | |
if temp_url: | |
url = temp_url | |
url = url.replace("blob", "resolve") | |
if "huggingface.co" not in url: | |
url = "https://huggingface.co" + url | |
wget.download(url) | |
else: | |
print("No .zip file found on the page.") | |
elif "cdn.discordapp.com" in url: | |
file = requests.get(url) | |
os.chdir("./assets/zips") | |
if file.status_code == 200: | |
name = url.split("/") | |
with open( | |
os.path.join(name[-1]), "wb" | |
) as newfile: | |
newfile.write(file.content) | |
else: | |
return None | |
elif "pixeldrain.com" in url: | |
try: | |
file_id = url.split("pixeldrain.com/u/")[1] | |
os.chdir(zips_path) | |
print(file_id) | |
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") | |
if response.status_code == 200: | |
file_name = ( | |
response.headers.get("Content-Disposition") | |
.split("filename=")[-1] | |
.strip('";') | |
) | |
os.makedirs(zips_path, exist_ok=True) | |
with open(os.path.join(zips_path, file_name), "wb") as newfile: | |
newfile.write(response.content) | |
os.chdir(file_path) | |
return "downloaded" | |
else: | |
os.chdir(file_path) | |
return None | |
except Exception as e: | |
print(e) | |
os.chdir(file_path) | |
return None | |
elif "mediafire.com" in url: | |
download_link = get_mediafire_download_link(url) | |
if download_link: | |
os.chdir(zips_path) | |
wget.download(download_link) | |
else: | |
return None | |
elif "www.weights.gg" in url: | |
#Pls weights creator dont fix this because yes. c: | |
url_parts = url.split("/") | |
weights_gg_index = url_parts.index("www.weights.gg") | |
if weights_gg_index != -1 and weights_gg_index < len(url_parts) - 1: | |
model_part = "/".join(url_parts[weights_gg_index + 1:]) | |
if "models" in model_part: | |
model_part = model_part.split("models/")[-1] | |
print(model_part) | |
if model_part: | |
download_url = f"https://www.weights.gg/es/models/{model_part}" | |
response = requests.get(download_url) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.text, "html.parser") | |
button_link = soup.find("a", class_="bg-black text-white px-3 py-2 rounded-lg flex items-center gap-1") | |
if button_link: | |
download_link = button_link["href"] | |
result = download_from_url(download_link) | |
if result == "downloaded": | |
return "downloaded" | |
else: | |
return None | |
else: | |
return None | |
else: | |
return None | |
else: | |
return None | |
else: | |
return None | |
else: | |
return None | |
else: | |
os.chdir(zips_path) | |
wget.download(url) | |
# Fix points in the zips | |
for currentPath, _, zipFiles in os.walk(zips_path): | |
for Files in zipFiles: | |
filePart = Files.split(".") | |
extensionFile = filePart[len(filePart) - 1] | |
filePart.pop() | |
nameFile = "_".join(filePart) | |
realPath = os.path.join(currentPath, Files) | |
os.rename(realPath, nameFile + "." + extensionFile) | |
os.chdir(file_path) | |
print(i18n("Full download")) | |
return "downloaded" | |
else: | |
return None | |
def extract_and_show_progress(zipfile_path, unzips_path): | |
try: | |
# Use shutil because zipfile not working | |
shutil.unpack_archive(zipfile_path, unzips_path) | |
return True | |
except Exception as e: | |
print(f"Error al descomprimir {zipfile_path}: {e}") | |
return False | |
def load_downloaded_model(url): | |
parent_path = find_folder_parent(now_dir, "assets") | |
response = requests.get(url) | |
response.raise_for_status() | |
try: | |
zips_path = os.path.join(parent_path, "assets", "zips") | |
unzips_path = os.path.join(parent_path, "assets", "unzips") | |
weights_path = os.path.join(parent_path, "logs", "weights") | |
logs_dir = "" | |
if os.path.exists(zips_path): | |
shutil.rmtree(zips_path) | |
if os.path.exists(unzips_path): | |
shutil.rmtree(unzips_path) | |
os.mkdir(zips_path) | |
os.mkdir(unzips_path) | |
download_file = download_from_url(url) | |
if not download_file: | |
print(i18n("The file could not be downloaded.")) | |
elif download_file == "downloaded": | |
print(i18n("It has been downloaded successfully.")) | |
elif download_file == "too much use": | |
raise Exception( | |
i18n("Too many users have recently viewed or downloaded this file") | |
) | |
elif download_file == "private link": | |
raise Exception(i18n("Cannot get file from this private link")) | |
for filename in os.listdir(zips_path): | |
if filename.endswith(".zip"): | |
zipfile_path = os.path.join(zips_path, filename) | |
print(i18n("Proceeding with the extraction...")) | |
model_name = os.path.basename(zipfile_path) | |
logs_dir = os.path.join( | |
parent_path, | |
"logs", | |
os.path.normpath(str(model_name).replace(".zip", "")), | |
) | |
success = extract_and_show_progress(zipfile_path, unzips_path) | |
if success: | |
print(f"Extracción exitosa: {model_name}") | |
else: | |
print(f"Fallo en la extracción: {model_name}") | |
else: | |
print(i18n("Unzip error.")) | |
return "" | |
index_file = False | |
model_file = False | |
for path, subdirs, files in os.walk(unzips_path): | |
for item in files: | |
item_path = os.path.join(path, item) | |
if not "G_" in item and not "D_" in item and item.endswith(".pth"): | |
model_file = True | |
model_name = item.replace(".pth", "") | |
logs_dir = os.path.join(parent_path, "logs", model_name) | |
if os.path.exists(logs_dir): | |
shutil.rmtree(logs_dir) | |
os.mkdir(logs_dir) | |
if not os.path.exists(weights_path): | |
os.mkdir(weights_path) | |
if os.path.exists(os.path.join(weights_path, item)): | |
os.remove(os.path.join(weights_path, item)) | |
if os.path.exists(item_path): | |
shutil.move(item_path, weights_path) | |
if not model_file and not os.path.exists(logs_dir): | |
os.mkdir(logs_dir) | |
for path, subdirs, files in os.walk(unzips_path): | |
for item in files: | |
item_path = os.path.join(path, item) | |
if item.startswith("added_") and item.endswith(".index"): | |
index_file = True | |
if os.path.exists(item_path): | |
if os.path.exists(os.path.join(logs_dir, item)): | |
os.remove(os.path.join(logs_dir, item)) | |
shutil.move(item_path, logs_dir) | |
if item.startswith("total_fea.npy") or item.startswith("events."): | |
if os.path.exists(item_path): | |
if os.path.exists(os.path.join(logs_dir, item)): | |
os.remove(os.path.join(logs_dir, item)) | |
shutil.move(item_path, logs_dir) | |
result = "" | |
if model_file: | |
if index_file: | |
print(i18n("The model works for inference, and has the .index file.")) | |
else: | |
print( | |
i18n( | |
"The model works for inference, but it doesn't have the .index file." | |
) | |
) | |
if not index_file and not model_file: | |
print(i18n("No relevant file was found to upload.")) | |
os.chdir(parent_path) | |
return result | |
except Exception as e: | |
os.chdir(parent_path) | |
if "too much use" in str(e): | |
print(i18n("Too many users have recently viewed or downloaded this file")) | |
elif "private link" in str(e): | |
print(i18n("Cannot get file from this private link")) | |
else: | |
print(i18n("An error occurred downloading")) | |
print(e) | |
finally: | |
os.chdir(parent_path) | |
def shoutdown(): | |
print("This flask server is shutting down please close the window") | |
pid = os.getpid() | |
os.kill(pid, signal.SIGTERM) | |
if __name__ == '__main__': | |
app.run(host='localhost', port=8000) | |