John6666's picture
Upload app.py
3598758 verified
import gradio as gr
import requests
from pathlib import Path
import re
import os
import tempfile
import shutil
import urllib
from huggingface_hub import whoami, HfApi, hf_hub_download, RepoCard
from huggingface_hub.utils import build_hf_headers, hf_raise_for_status
from gradio_huggingfacehub_search import HuggingfaceHubSearch
ENDPOINT = "https://huggingface.co"
# ENDPOINT = "http://localhost:5564"
REPO_TYPES = ["model", "dataset", "space"]
HF_REPO = os.environ.get("HF_REPO") if os.environ.get("HF_REPO") else "" # set your default repo
HF_REPO_PREFIX = os.environ.get("HF_REPO_PREFIX") if os.environ.get("HF_REPO_PREFIX") else "" # set your default repo prefix
HF_REPO_SUFFIX = os.environ.get("HF_REPO_SUFFIX") if os.environ.get("HF_REPO_SUFFIX") else "" # set your default repo suffix
HF_USER = os.environ.get("HF_USER") if os.environ.get("HF_USER") else "" # set your username
REGEX_HF_REPO = r'^[\w_\-\.]+/[\w_\-\.]+$'
REGEX_HF_PATH = r'^[\w_\-\.]+/[\w_\-\.]+(/?:.+)?$'
def is_valid_reponame(repo_id: str):
return re.fullmatch(REGEX_HF_REPO, repo_id)
def is_valid_path(hf_path: str):
return re.fullmatch(REGEX_HF_PATH, hf_path)
def create_repo(repo_id: str, repo_type: str, private: bool, hf_token: str):
api = HfApi(token=hf_token)
if repo_type == "space": api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, space_sdk="gradio", token=hf_token)
else: api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, token=hf_token)
def extract_src_reponame(source_repo: str):
try:
if is_valid_reponame(source_repo): target = ""
else:
source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0]
target = urllib.parse.unquote(target.removesuffix("/"))
return source_repo, target
except Exception as e:
print(e)
return source_repo, ""
def extract_dst_reponame(dst_repo: str):
try:
if is_valid_reponame(dst_repo): subfolder = ""
else:
dst_repo, subfolder = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0]
subfolder = subfolder.removesuffix("/")
return dst_repo, subfolder
except Exception as e:
print(e)
return dst_repo, ""
def remove_repo_tags(repo_id: str, tags: list[str], repo_type: str, hf_token: str):
try:
card = RepoCard.load(repo_id, repo_type=repo_type, token=hf_token)
orig_content = card.content
for tag in tags:
if 'tags' in card.data and tag in card.data['tags']: card.data['tags'].remove(tag)
if card.content == orig_content: return
card.push_to_hub(repo_id=repo_id, repo_type=repo_type, token=hf_token)
except Exception as e:
print(f"Failed to remove tags from repocard. {e}")
def duplicate(source_repo, dst_repo, repo_type, private, overwrite, auto_dir, remove_tag, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}")
try:
source_repo, target = extract_src_reponame(source_repo)
dst_repo, subfolder = extract_dst_reponame(dst_repo)
if auto_dir: subfolder = source_repo
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}")
if overwrite or subfolder:
temp_dir = tempfile.mkdtemp()
create_repo(dst_repo, repo_type, private, hf_token)
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token):
if target and target not in path: continue
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token)
if not Path(file).exists(): continue
if Path(file).is_dir(): # unused for now
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
elif Path(file).is_file():
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token)
if Path(file).exists(): Path(file).unlink()
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}"
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}"
else: repo_url = f"https://huggingface.co/{dst_repo}"
shutil.rmtree(temp_dir)
else:
r = requests.post(
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate",
headers=build_hf_headers(token=oauth_token.token),
json={"repository": dst_repo, "private": private},
)
hf_raise_for_status(r)
repo_url = r.json().get("url")
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token)
return (
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>',
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def parse_repos(s):
repo_pattern = r'[^\w_\-\.]?([\w_\-\.]+/[\w_\-\.]+)[^\w_\-\.]?'
try:
s = re.sub("https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+", "", s)
repos = re.findall(repo_pattern, s)
return list(repos)
except Exception:
return []
def is_same_file_hf(src_repo: str, src_path: str, src_type: str, dst_repo: str, dst_path: str, dst_type: str, hf_token: str):
api = HfApi(token=hf_token)
if not api.file_exists(repo_id=src_repo, filename=src_path, repo_type=src_type, token=hf_token): return False
if not api.file_exists(repo_id=dst_repo, filename=dst_path, repo_type=dst_type, token=hf_token): return False
src_info = api.get_paths_info(repo_id=src_repo, paths=src_path, repo_type=src_type, token=hf_token)
dst_info = api.get_paths_info(repo_id=dst_repo, paths=dst_path, repo_type=dst_type, token=hf_token)
if not src_info or not dst_info or len(src_info) != 1 or len(dst_info) != 1 or src_info[0].lfs is None: return False
if src_info[0].size == dst_info[0].size and src_info[0].lfs.sha256 == dst_info[0].lfs.sha256: return True
return False
def duplicate_m2o(source_repos_str, dst_repo, repo_type, private, overwrite, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}")
try:
dst_repo, subfolder_prefix = extract_dst_reponame(dst_repo)
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}")
source_repos = parse_repos(source_repos_str)
for source_repo in source_repos:
source_repo, target = extract_src_reponame(source_repo)
subfolder = subfolder_prefix + "/" + source_repo if subfolder_prefix else source_repo
temp_dir = tempfile.mkdtemp()
create_repo(dst_repo, repo_type, private, hf_token)
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token):
if target and target not in path: continue
path_in_repo = f"{subfolder}/{path}" if subfolder else path
if is_same_file_hf(source_repo, path, repo_type, dst_repo, path_in_repo, repo_type, hf_token):
print(f"{dst_repo}/{path_in_repo} is already exists. Skipping.")
continue
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token)
if not Path(file).exists(): continue
if Path(file).is_dir(): # unused for now
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token)
elif Path(file).is_file():
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token)
if Path(file).exists(): Path(file).unlink()
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}"
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}"
else: repo_url = f"https://huggingface.co/{dst_repo}"
shutil.rmtree(temp_dir)
return (
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>',
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def duplicate_m2m(source_repos_str, hf_user, repo_type, private, overwrite, remove_tag, repo_prefix, repo_suffix, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)):
hf_token = oauth_token.token
api = HfApi(token=hf_token)
try:
if not repo_type in REPO_TYPES:
raise ValueError("need to select valid repo type")
_ = whoami(oauth_token.token)
# ^ this will throw if token is invalid
except Exception as e:
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""")
try:
source_repos = parse_repos(source_repos_str)
repo_url_result = 'Find your repo '
for source_repo in source_repos:
if not is_valid_reponame(source_repo) or not api.repo_exists(repo_id=source_repo, repo_type=repo_type, token=hf_token): continue
dst_repo = hf_user + "/" + repo_prefix + source_repo.split("/")[-1] + repo_suffix
if not is_valid_reponame(dst_repo): continue
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token):
gr.Info(f"Repo already exists {dst_repo}")
continue
r = requests.post(
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate",
headers=build_hf_headers(token=oauth_token.token),
json={"repository": dst_repo, "private": private},
)
hf_raise_for_status(r)
repo_url = r.json().get("url")
repo_url_result += f'<a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">{dst_repo}</a><br>\n'
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token)
return (
repo_url_result,
"sp.jpg",
)
except Exception as e:
print(e)
raise gr.Error(f"Error occured: {e}")
def add_repo_text(repo_id: str, source_repos: str):
return source_repos + "\n" + repo_id if source_repos else repo_id
def swap_visibilty(profile: gr.OAuthProfile | None):
return gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"])
css = '''
.main_ui_logged_out{opacity: 0.3; pointer-events: none}
.title {text-align: center; align-items: center}
'''
with gr.Blocks(css=css) as demo:
gr.LoginButton()
with gr.Column(elem_classes="main_ui_logged_out") as main_ui:
gr.Markdown("# Duplicate your repo!", elem_classes="title")
gr.Markdown("Duplicate a Hugging Face repository! This Space is a an experimental demo.")
with gr.Tab("One to One"):
with gr.Row():
with gr.Column():
search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=False,
)
with gr.Group():
dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO)
repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
is_private = gr.Checkbox(label="Make new repo private?", value=True)
is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False)
is_subdir = gr.Checkbox(label="Create subdirectories automatically?", value=False)
is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=False)
with gr.Row():
submit_button = gr.Button("Submit", variant="primary")
clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
output_md = gr.Markdown(label="output")
output_image = gr.Image(show_label=False)
with gr.Tab("Multi to One"):
with gr.Row():
with gr.Column():
m2o_search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=True,
)
m2o_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10)
with gr.Group():
m2o_dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO)
m2o_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
m2o_is_private = gr.Checkbox(label="Make new repo private?", value=True)
m2o_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True)
with gr.Row():
m2o_submit_button = gr.Button("Submit", variant="primary")
m2o_clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
m2o_output_md = gr.Markdown(label="output")
m2o_output_image = gr.Image(show_label=False)
with gr.Tab("Multi to Multi"):
with gr.Row():
with gr.Column():
m2m_search = HuggingfaceHubSearch(
label="source_repo",
placeholder="Source repository (e.g. osanseviero/src)",
search_type=["model", "dataset", "space"],
sumbit_on_select=True,
)
m2m_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10)
with gr.Group():
with gr.Row():
m2m_user = gr.Textbox(label="hf_user", placeholder="Your HF username", value=HF_USER)
m2m_prefix = gr.Textbox(label="repo_prefix", value=HF_REPO_PREFIX)
m2m_suffix = gr.Textbox(label="repo_suffix", value=HF_REPO_SUFFIX)
m2m_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model")
with gr.Row():
m2m_is_private = gr.Checkbox(label="Make new repo private?", value=True)
m2m_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False)
m2m_is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=True)
with gr.Row():
m2m_submit_button = gr.Button("Submit", variant="primary")
m2m_clear_button = gr.Button("Clear", variant="secondary")
with gr.Column():
m2m_output_md = gr.Markdown(label="output")
m2m_output_image = gr.Image(show_label=False)
demo.load(fn=swap_visibilty, outputs=main_ui)
submit_button.click(duplicate, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], [output_md, output_image])
clear_button.click(lambda: ("", HF_REPO, "model", True, True, True, True), None, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], queue=False)
m2o_search.submit(add_repo_text, [m2o_search, m2o_source_repos], [m2o_source_repos], queue=False)
m2o_submit_button.click(duplicate_m2o, [m2o_source_repos, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite], [m2o_output_md, m2o_output_image])
m2o_clear_button.click(lambda: ("", HF_REPO, "model", True, True, ""), None,
[m2o_search, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite, m2o_source_repos], queue=False)
m2m_search.submit(add_repo_text, [m2m_search, m2m_source_repos], [m2m_source_repos], queue=False)
m2m_submit_button.click(duplicate_m2m, [m2m_source_repos, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_prefix, m2m_suffix],
[m2m_output_md, m2m_output_image])
m2m_clear_button.click(lambda: ("", HF_USER, "model", True, False, True, "", HF_REPO_PREFIX, HF_REPO_SUFFIX), None,
[m2m_search, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_source_repos, m2m_prefix, m2m_suffix], queue=False)
demo.queue()
demo.launch()