Spaces:
Running
on
Zero
Running
on
Zero
import os, zipfile, rarfile, shutil, subprocess, shlex, sys # noqa | |
from .logging_setup import logger | |
from urllib.parse import urlparse | |
from IPython.utils import capture | |
import re | |
VIDEO_EXTENSIONS = [ | |
".mp4", | |
".avi", | |
".mov", | |
".mkv", | |
".wmv", | |
".flv", | |
".webm", | |
".m4v", | |
".mpeg", | |
".mpg", | |
".3gp" | |
] | |
AUDIO_EXTENSIONS = [ | |
".mp3", | |
".wav", | |
".aiff", | |
".aif", | |
".flac", | |
".aac", | |
".ogg", | |
".wma", | |
".m4a", | |
".alac", | |
".pcm", | |
".opus", | |
".ape", | |
".amr", | |
".ac3", | |
".vox", | |
".caf" | |
] | |
SUBTITLE_EXTENSIONS = [ | |
".srt", | |
".vtt", | |
".ass" | |
] | |
def run_command(command): | |
logger.debug(command) | |
if isinstance(command, str): | |
command = shlex.split(command) | |
sub_params = { | |
"stdout": subprocess.PIPE, | |
"stderr": subprocess.PIPE, | |
"creationflags": subprocess.CREATE_NO_WINDOW | |
if sys.platform == "win32" | |
else 0, | |
} | |
process_command = subprocess.Popen(command, **sub_params) | |
output, errors = process_command.communicate() | |
if ( | |
process_command.returncode != 0 | |
): # or not os.path.exists(mono_path) or os.path.getsize(mono_path) == 0: | |
logger.error("Error comnand") | |
raise Exception(errors.decode()) | |
def print_tree_directory(root_dir, indent=""): | |
if not os.path.exists(root_dir): | |
logger.error(f"{indent} Invalid directory or file: {root_dir}") | |
return | |
items = os.listdir(root_dir) | |
for index, item in enumerate(sorted(items)): | |
item_path = os.path.join(root_dir, item) | |
is_last_item = index == len(items) - 1 | |
if os.path.isfile(item_path) and item_path.endswith(".zip"): | |
with zipfile.ZipFile(item_path, "r") as zip_file: | |
print( | |
f"{indent}{'└──' if is_last_item else '├──'} {item} (zip file)" | |
) | |
zip_contents = zip_file.namelist() | |
for zip_item in sorted(zip_contents): | |
print( | |
f"{indent}{' ' if is_last_item else '│ '}{zip_item}" | |
) | |
else: | |
print(f"{indent}{'└──' if is_last_item else '├──'} {item}") | |
if os.path.isdir(item_path): | |
new_indent = indent + (" " if is_last_item else "│ ") | |
print_tree_directory(item_path, new_indent) | |
def upload_model_list(): | |
weight_root = "weights" | |
models = [] | |
for name in os.listdir(weight_root): | |
if name.endswith(".pth"): | |
models.append("weights/" + name) | |
if models: | |
logger.debug(models) | |
index_root = "logs" | |
index_paths = [None] | |
for name in os.listdir(index_root): | |
if name.endswith(".index"): | |
index_paths.append("logs/" + name) | |
if index_paths: | |
logger.debug(index_paths) | |
return models, index_paths | |
def manual_download(url, dst): | |
if "drive.google" in url: | |
logger.info("Drive url") | |
if "folders" in url: | |
logger.info("folder") | |
os.system(f'gdown --folder "{url}" -O {dst} --fuzzy -c') | |
else: | |
logger.info("single") | |
os.system(f'gdown "{url}" -O {dst} --fuzzy -c') | |
elif "huggingface" in url: | |
logger.info("HuggingFace url") | |
if "/blob/" in url or "/resolve/" in url: | |
if "/blob/" in url: | |
url = url.replace("/blob/", "/resolve/") | |
download_manager(url=url, path=dst, overwrite=True, progress=True) | |
else: | |
os.system(f"git clone {url} {dst+'repo/'}") | |
elif "http" in url: | |
logger.info("URL") | |
download_manager(url=url, path=dst, overwrite=True, progress=True) | |
elif os.path.exists(url): | |
logger.info("Path") | |
copy_files(url, dst) | |
else: | |
logger.error(f"No valid URL: {url}") | |
def download_list(text_downloads): | |
if os.environ.get("ZERO_GPU") == "TRUE": | |
raise RuntimeError("This option is disabled in this demo.") | |
try: | |
urls = [elem.strip() for elem in text_downloads.split(",")] | |
except Exception as error: | |
raise ValueError(f"No valid URL. {str(error)}") | |
create_directories(["downloads", "logs", "weights"]) | |
path_download = "downloads/" | |
for url in urls: | |
manual_download(url, path_download) | |
# Tree | |
print("####################################") | |
print_tree_directory("downloads", indent="") | |
print("####################################") | |
# Place files | |
select_zip_and_rar_files("downloads/") | |
models, _ = upload_model_list() | |
# hf space models files delete | |
remove_directory_contents("downloads/repo") | |
return f"Downloaded = {models}" | |
def select_zip_and_rar_files(directory_path="downloads/"): | |
# filter | |
zip_files = [] | |
rar_files = [] | |
for file_name in os.listdir(directory_path): | |
if file_name.endswith(".zip"): | |
zip_files.append(file_name) | |
elif file_name.endswith(".rar"): | |
rar_files.append(file_name) | |
# extract | |
for file_name in zip_files: | |
file_path = os.path.join(directory_path, file_name) | |
with zipfile.ZipFile(file_path, "r") as zip_ref: | |
zip_ref.extractall(directory_path) | |
for file_name in rar_files: | |
file_path = os.path.join(directory_path, file_name) | |
with rarfile.RarFile(file_path, "r") as rar_ref: | |
rar_ref.extractall(directory_path) | |
# set in path | |
def move_files_with_extension(src_dir, extension, destination_dir): | |
for root, _, files in os.walk(src_dir): | |
for file_name in files: | |
if file_name.endswith(extension): | |
source_file = os.path.join(root, file_name) | |
destination = os.path.join(destination_dir, file_name) | |
shutil.move(source_file, destination) | |
move_files_with_extension(directory_path, ".index", "logs/") | |
move_files_with_extension(directory_path, ".pth", "weights/") | |
return "Download complete" | |
def is_file_with_extensions(string_path, extensions): | |
return any(string_path.lower().endswith(ext) for ext in extensions) | |
def is_video_file(string_path): | |
return is_file_with_extensions(string_path, VIDEO_EXTENSIONS) | |
def is_audio_file(string_path): | |
return is_file_with_extensions(string_path, AUDIO_EXTENSIONS) | |
def is_subtitle_file(string_path): | |
return is_file_with_extensions(string_path, SUBTITLE_EXTENSIONS) | |
def get_directory_files(directory): | |
audio_files = [] | |
video_files = [] | |
sub_files = [] | |
for item in os.listdir(directory): | |
item_path = os.path.join(directory, item) | |
if os.path.isfile(item_path): | |
if is_audio_file(item_path): | |
audio_files.append(item_path) | |
elif is_video_file(item_path): | |
video_files.append(item_path) | |
elif is_subtitle_file(item_path): | |
sub_files.append(item_path) | |
logger.info( | |
f"Files in path ({directory}): " | |
f"{str(audio_files + video_files + sub_files)}" | |
) | |
return audio_files, video_files, sub_files | |
def get_valid_files(paths): | |
valid_paths = [] | |
for path in paths: | |
if os.path.isdir(path): | |
audio_files, video_files, sub_files = get_directory_files(path) | |
valid_paths.extend(audio_files) | |
valid_paths.extend(video_files) | |
valid_paths.extend(sub_files) | |
else: | |
valid_paths.append(path) | |
return valid_paths | |
def extract_video_links(link): | |
params_dlp = {"quiet": False, "no_warnings": True, "noplaylist": False} | |
try: | |
from yt_dlp import YoutubeDL | |
with capture.capture_output() as cap: | |
with YoutubeDL(params_dlp) as ydl: | |
info_dict = ydl.extract_info( # noqa | |
link, download=False, process=True | |
) | |
urls = re.findall(r'\[youtube\] Extracting URL: (.*?)\n', cap.stdout) | |
logger.info(f"List of videos in ({link}): {str(urls)}") | |
del cap | |
except Exception as error: | |
logger.error(f"{link} >> {str(error)}") | |
urls = [link] | |
return urls | |
def get_link_list(urls): | |
valid_links = [] | |
for url_video in urls: | |
if "youtube.com" in url_video and "/watch?v=" not in url_video: | |
url_links = extract_video_links(url_video) | |
valid_links.extend(url_links) | |
else: | |
valid_links.append(url_video) | |
return valid_links | |
# ===================================== | |
# Download Manager | |
# ===================================== | |
def load_file_from_url( | |
url: str, | |
model_dir: str, | |
file_name: str | None = None, | |
overwrite: bool = False, | |
progress: bool = True, | |
) -> str: | |
"""Download a file from `url` into `model_dir`, | |
using the file present if possible. | |
Returns the path to the downloaded file. | |
""" | |
os.makedirs(model_dir, exist_ok=True) | |
if not file_name: | |
parts = urlparse(url) | |
file_name = os.path.basename(parts.path) | |
cached_file = os.path.abspath(os.path.join(model_dir, file_name)) | |
# Overwrite | |
if os.path.exists(cached_file): | |
if overwrite or os.path.getsize(cached_file) == 0: | |
remove_files(cached_file) | |
# Download | |
if not os.path.exists(cached_file): | |
logger.info(f'Downloading: "{url}" to {cached_file}\n') | |
from torch.hub import download_url_to_file | |
download_url_to_file(url, cached_file, progress=progress) | |
else: | |
logger.debug(cached_file) | |
return cached_file | |
def friendly_name(file: str): | |
if file.startswith("http"): | |
file = urlparse(file).path | |
file = os.path.basename(file) | |
model_name, extension = os.path.splitext(file) | |
return model_name, extension | |
def download_manager( | |
url: str, | |
path: str, | |
extension: str = "", | |
overwrite: bool = False, | |
progress: bool = True, | |
): | |
url = url.strip() | |
name, ext = friendly_name(url) | |
name += ext if not extension else f".{extension}" | |
if url.startswith("http"): | |
filename = load_file_from_url( | |
url=url, | |
model_dir=path, | |
file_name=name, | |
overwrite=overwrite, | |
progress=progress, | |
) | |
else: | |
filename = path | |
return filename | |
# ===================================== | |
# File management | |
# ===================================== | |
# only remove files | |
def remove_files(file_list): | |
if isinstance(file_list, str): | |
file_list = [file_list] | |
for file in file_list: | |
if os.path.exists(file): | |
os.remove(file) | |
def remove_directory_contents(directory_path): | |
""" | |
Removes all files and subdirectories within a directory. | |
Parameters: | |
directory_path (str): Path to the directory whose | |
contents need to be removed. | |
""" | |
if os.path.exists(directory_path): | |
for filename in os.listdir(directory_path): | |
file_path = os.path.join(directory_path, filename) | |
try: | |
if os.path.isfile(file_path): | |
os.remove(file_path) | |
elif os.path.isdir(file_path): | |
shutil.rmtree(file_path) | |
except Exception as e: | |
logger.error(f"Failed to delete {file_path}. Reason: {e}") | |
logger.info(f"Content in '{directory_path}' removed.") | |
else: | |
logger.error(f"Directory '{directory_path}' does not exist.") | |
# Create directory if not exists | |
def create_directories(directory_path): | |
if isinstance(directory_path, str): | |
directory_path = [directory_path] | |
for one_dir_path in directory_path: | |
if not os.path.exists(one_dir_path): | |
os.makedirs(one_dir_path) | |
logger.debug(f"Directory '{one_dir_path}' created.") | |
def move_files(source_dir, destination_dir, extension=""): | |
""" | |
Moves file(s) from the source path to the destination path. | |
Parameters: | |
source_dir (str): Path to the source directory. | |
destination_dir (str): Path to the destination directory. | |
extension (str): Only move files with this extension. | |
""" | |
create_directories(destination_dir) | |
for filename in os.listdir(source_dir): | |
source_path = os.path.join(source_dir, filename) | |
destination_path = os.path.join(destination_dir, filename) | |
if extension and not filename.endswith(extension): | |
continue | |
os.replace(source_path, destination_path) | |
def copy_files(source_path, destination_path): | |
""" | |
Copies a file or multiple files from a source path to a destination path. | |
Parameters: | |
source_path (str or list): Path or list of paths to the source | |
file(s) or directory. | |
destination_path (str): Path to the destination directory. | |
""" | |
create_directories(destination_path) | |
if isinstance(source_path, str): | |
source_path = [source_path] | |
if os.path.isdir(source_path[0]): | |
# Copy all files from the source directory to the destination directory | |
base_path = source_path[0] | |
source_path = os.listdir(source_path[0]) | |
source_path = [ | |
os.path.join(base_path, file_name) for file_name in source_path | |
] | |
for one_source_path in source_path: | |
if os.path.exists(one_source_path): | |
shutil.copy2(one_source_path, destination_path) | |
logger.debug( | |
f"File '{one_source_path}' copied to '{destination_path}'." | |
) | |
else: | |
logger.error(f"File '{one_source_path}' does not exist.") | |
def rename_file(current_name, new_name): | |
file_directory = os.path.dirname(current_name) | |
if os.path.exists(current_name): | |
dir_new_name_file = os.path.join(file_directory, new_name) | |
os.rename(current_name, dir_new_name_file) | |
logger.debug(f"File '{current_name}' renamed to '{new_name}'.") | |
return dir_new_name_file | |
else: | |
logger.error(f"File '{current_name}' does not exist.") | |
return None | |