Spaces:
Running
Running
import gradio as gr | |
import requests | |
from pathlib import Path | |
import re | |
import os | |
import tempfile | |
import shutil | |
import urllib | |
from huggingface_hub import whoami, HfApi, hf_hub_download, RepoCard | |
from huggingface_hub.utils import build_hf_headers, hf_raise_for_status | |
from gradio_huggingfacehub_search import HuggingfaceHubSearch | |
ENDPOINT = "https://huggingface.co" | |
# ENDPOINT = "http://localhost:5564" | |
REPO_TYPES = ["model", "dataset", "space"] | |
HF_REPO = os.environ.get("HF_REPO") if os.environ.get("HF_REPO") else "" # set your default repo | |
HF_REPO_PREFIX = os.environ.get("HF_REPO_PREFIX") if os.environ.get("HF_REPO_PREFIX") else "" # set your default repo prefix | |
HF_REPO_SUFFIX = os.environ.get("HF_REPO_SUFFIX") if os.environ.get("HF_REPO_SUFFIX") else "" # set your default repo suffix | |
HF_USER = os.environ.get("HF_USER") if os.environ.get("HF_USER") else "" # set your username | |
REGEX_HF_REPO = r'^[\w_\-\.]+/[\w_\-\.]+$' | |
REGEX_HF_PATH = r'^[\w_\-\.]+/[\w_\-\.]+(/?:.+)?$' | |
def is_valid_reponame(repo_id: str): | |
return re.fullmatch(REGEX_HF_REPO, repo_id) | |
def is_valid_path(hf_path: str): | |
return re.fullmatch(REGEX_HF_PATH, hf_path) | |
def create_repo(repo_id: str, repo_type: str, private: bool, hf_token: str): | |
api = HfApi(token=hf_token) | |
if repo_type == "space": api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, space_sdk="gradio", token=hf_token) | |
else: api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, token=hf_token) | |
def extract_src_reponame(source_repo: str): | |
try: | |
if is_valid_reponame(source_repo): target = "" | |
else: | |
source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0] | |
target = urllib.parse.unquote(target.removesuffix("/")) | |
return source_repo, target | |
except Exception as e: | |
print(e) | |
return source_repo, "" | |
def extract_dst_reponame(dst_repo: str): | |
try: | |
if is_valid_reponame(dst_repo): subfolder = "" | |
else: | |
dst_repo, subfolder = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0] | |
subfolder = subfolder.removesuffix("/") | |
return dst_repo, subfolder | |
except Exception as e: | |
print(e) | |
return dst_repo, "" | |
def remove_repo_tags(repo_id: str, tags: list[str], repo_type: str, hf_token: str): | |
try: | |
card = RepoCard.load(repo_id, repo_type=repo_type, token=hf_token) | |
orig_content = card.content | |
for tag in tags: | |
if 'tags' in card.data and tag in card.data['tags']: card.data['tags'].remove(tag) | |
if card.content == orig_content: return | |
card.push_to_hub(repo_id=repo_id, repo_type=repo_type, token=hf_token) | |
except Exception as e: | |
print(f"Failed to remove tags from repocard. {e}") | |
def duplicate(source_repo, dst_repo, repo_type, private, overwrite, auto_dir, remove_tag, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
hf_token = oauth_token.token | |
api = HfApi(token=hf_token) | |
try: | |
if not repo_type in REPO_TYPES: | |
raise ValueError("need to select valid repo type") | |
_ = whoami(oauth_token.token) | |
# ^ this will throw if token is invalid | |
except Exception as e: | |
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}") | |
try: | |
source_repo, target = extract_src_reponame(source_repo) | |
dst_repo, subfolder = extract_dst_reponame(dst_repo) | |
if auto_dir: subfolder = source_repo | |
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") | |
if overwrite or subfolder: | |
temp_dir = tempfile.mkdtemp() | |
create_repo(dst_repo, repo_type, private, hf_token) | |
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): | |
if target and target not in path: continue | |
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) | |
if not Path(file).exists(): continue | |
if Path(file).is_dir(): # unused for now | |
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
elif Path(file).is_file(): | |
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) | |
if Path(file).exists(): Path(file).unlink() | |
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" | |
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" | |
else: repo_url = f"https://huggingface.co/{dst_repo}" | |
shutil.rmtree(temp_dir) | |
else: | |
r = requests.post( | |
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", | |
headers=build_hf_headers(token=oauth_token.token), | |
json={"repository": dst_repo, "private": private}, | |
) | |
hf_raise_for_status(r) | |
repo_url = r.json().get("url") | |
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) | |
return ( | |
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>', | |
"sp.jpg", | |
) | |
except Exception as e: | |
print(e) | |
raise gr.Error(f"Error occured: {e}") | |
def parse_repos(s): | |
repo_pattern = r'[^\w_\-\.]?([\w_\-\.]+/[\w_\-\.]+)[^\w_\-\.]?' | |
try: | |
s = re.sub("https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+", "", s) | |
repos = re.findall(repo_pattern, s) | |
return list(repos) | |
except Exception: | |
return [] | |
def is_same_file_hf(src_repo: str, src_path: str, src_type: str, dst_repo: str, dst_path: str, dst_type: str, hf_token: str): | |
api = HfApi(token=hf_token) | |
if not api.file_exists(repo_id=src_repo, filename=src_path, repo_type=src_type, token=hf_token): return False | |
if not api.file_exists(repo_id=dst_repo, filename=dst_path, repo_type=dst_type, token=hf_token): return False | |
src_info = api.get_paths_info(repo_id=src_repo, paths=src_path, repo_type=src_type, token=hf_token) | |
dst_info = api.get_paths_info(repo_id=dst_repo, paths=dst_path, repo_type=dst_type, token=hf_token) | |
if not src_info or not dst_info or len(src_info) != 1 or len(dst_info) != 1 or src_info[0].lfs is None: return False | |
if src_info[0].size == dst_info[0].size and src_info[0].lfs.sha256 == dst_info[0].lfs.sha256: return True | |
return False | |
def duplicate_m2o(source_repos_str, dst_repo, repo_type, private, overwrite, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
hf_token = oauth_token.token | |
api = HfApi(token=hf_token) | |
try: | |
if not repo_type in REPO_TYPES: | |
raise ValueError("need to select valid repo type") | |
_ = whoami(oauth_token.token) | |
# ^ this will throw if token is invalid | |
except Exception as e: | |
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}") | |
try: | |
dst_repo, subfolder_prefix = extract_dst_reponame(dst_repo) | |
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") | |
source_repos = parse_repos(source_repos_str) | |
for source_repo in source_repos: | |
source_repo, target = extract_src_reponame(source_repo) | |
subfolder = subfolder_prefix + "/" + source_repo if subfolder_prefix else source_repo | |
temp_dir = tempfile.mkdtemp() | |
create_repo(dst_repo, repo_type, private, hf_token) | |
for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): | |
if target and target not in path: continue | |
path_in_repo = f"{subfolder}/{path}" if subfolder else path | |
if is_same_file_hf(source_repo, path, repo_type, dst_repo, path_in_repo, repo_type, hf_token): | |
print(f"{dst_repo}/{path_in_repo} is already exists. Skipping.") | |
continue | |
file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) | |
if not Path(file).exists(): continue | |
if Path(file).is_dir(): # unused for now | |
api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token) | |
elif Path(file).is_file(): | |
api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token) | |
if Path(file).exists(): Path(file).unlink() | |
if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" | |
elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" | |
else: repo_url = f"https://huggingface.co/{dst_repo}" | |
shutil.rmtree(temp_dir) | |
return ( | |
f'Find your repo <a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">here</a>', | |
"sp.jpg", | |
) | |
except Exception as e: | |
print(e) | |
raise gr.Error(f"Error occured: {e}") | |
def duplicate_m2m(source_repos_str, hf_user, repo_type, private, overwrite, remove_tag, repo_prefix, repo_suffix, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): | |
hf_token = oauth_token.token | |
api = HfApi(token=hf_token) | |
try: | |
if not repo_type in REPO_TYPES: | |
raise ValueError("need to select valid repo type") | |
_ = whoami(oauth_token.token) | |
# ^ this will throw if token is invalid | |
except Exception as e: | |
raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") | |
try: | |
source_repos = parse_repos(source_repos_str) | |
repo_url_result = 'Find your repo ' | |
for source_repo in source_repos: | |
if not is_valid_reponame(source_repo) or not api.repo_exists(repo_id=source_repo, repo_type=repo_type, token=hf_token): continue | |
dst_repo = hf_user + "/" + repo_prefix + source_repo.split("/")[-1] + repo_suffix | |
if not is_valid_reponame(dst_repo): continue | |
if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): | |
gr.Info(f"Repo already exists {dst_repo}") | |
continue | |
r = requests.post( | |
f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", | |
headers=build_hf_headers(token=oauth_token.token), | |
json={"repository": dst_repo, "private": private}, | |
) | |
hf_raise_for_status(r) | |
repo_url = r.json().get("url") | |
repo_url_result += f'<a href=\'{repo_url}\' target="_blank" style="text-decoration:underline">{dst_repo}</a><br>\n' | |
if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) | |
return ( | |
repo_url_result, | |
"sp.jpg", | |
) | |
except Exception as e: | |
print(e) | |
raise gr.Error(f"Error occured: {e}") | |
def add_repo_text(repo_id: str, source_repos: str): | |
return source_repos + "\n" + repo_id if source_repos else repo_id | |
def swap_visibilty(profile: gr.OAuthProfile | None): | |
return gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"]) | |
css = ''' | |
.main_ui_logged_out{opacity: 0.3; pointer-events: none} | |
.title {text-align: center; align-items: center} | |
''' | |
with gr.Blocks(css=css) as demo: | |
gr.LoginButton() | |
with gr.Column(elem_classes="main_ui_logged_out") as main_ui: | |
gr.Markdown("# Duplicate your repo!", elem_classes="title") | |
gr.Markdown("Duplicate a Hugging Face repository! This Space is a an experimental demo.") | |
with gr.Tab("One to One"): | |
with gr.Row(): | |
with gr.Column(): | |
search = HuggingfaceHubSearch( | |
label="source_repo", | |
placeholder="Source repository (e.g. osanseviero/src)", | |
search_type=["model", "dataset", "space"], | |
sumbit_on_select=False, | |
) | |
with gr.Group(): | |
dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) | |
repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
with gr.Row(): | |
is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False) | |
is_subdir = gr.Checkbox(label="Create subdirectories automatically?", value=False) | |
is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=False) | |
with gr.Row(): | |
submit_button = gr.Button("Submit", variant="primary") | |
clear_button = gr.Button("Clear", variant="secondary") | |
with gr.Column(): | |
output_md = gr.Markdown(label="output") | |
output_image = gr.Image(show_label=False) | |
with gr.Tab("Multi to One"): | |
with gr.Row(): | |
with gr.Column(): | |
m2o_search = HuggingfaceHubSearch( | |
label="source_repo", | |
placeholder="Source repository (e.g. osanseviero/src)", | |
search_type=["model", "dataset", "space"], | |
sumbit_on_select=True, | |
) | |
m2o_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) | |
with gr.Group(): | |
m2o_dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) | |
m2o_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
with gr.Row(): | |
m2o_is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
m2o_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True) | |
with gr.Row(): | |
m2o_submit_button = gr.Button("Submit", variant="primary") | |
m2o_clear_button = gr.Button("Clear", variant="secondary") | |
with gr.Column(): | |
m2o_output_md = gr.Markdown(label="output") | |
m2o_output_image = gr.Image(show_label=False) | |
with gr.Tab("Multi to Multi"): | |
with gr.Row(): | |
with gr.Column(): | |
m2m_search = HuggingfaceHubSearch( | |
label="source_repo", | |
placeholder="Source repository (e.g. osanseviero/src)", | |
search_type=["model", "dataset", "space"], | |
sumbit_on_select=True, | |
) | |
m2m_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) | |
with gr.Group(): | |
with gr.Row(): | |
m2m_user = gr.Textbox(label="hf_user", placeholder="Your HF username", value=HF_USER) | |
m2m_prefix = gr.Textbox(label="repo_prefix", value=HF_REPO_PREFIX) | |
m2m_suffix = gr.Textbox(label="repo_suffix", value=HF_REPO_SUFFIX) | |
m2m_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") | |
with gr.Row(): | |
m2m_is_private = gr.Checkbox(label="Make new repo private?", value=True) | |
m2m_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False) | |
m2m_is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=True) | |
with gr.Row(): | |
m2m_submit_button = gr.Button("Submit", variant="primary") | |
m2m_clear_button = gr.Button("Clear", variant="secondary") | |
with gr.Column(): | |
m2m_output_md = gr.Markdown(label="output") | |
m2m_output_image = gr.Image(show_label=False) | |
demo.load(fn=swap_visibilty, outputs=main_ui) | |
submit_button.click(duplicate, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], [output_md, output_image]) | |
clear_button.click(lambda: ("", HF_REPO, "model", True, True, True, True), None, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], queue=False) | |
m2o_search.submit(add_repo_text, [m2o_search, m2o_source_repos], [m2o_source_repos], queue=False) | |
m2o_submit_button.click(duplicate_m2o, [m2o_source_repos, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite], [m2o_output_md, m2o_output_image]) | |
m2o_clear_button.click(lambda: ("", HF_REPO, "model", True, True, ""), None, | |
[m2o_search, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite, m2o_source_repos], queue=False) | |
m2m_search.submit(add_repo_text, [m2m_search, m2m_source_repos], [m2m_source_repos], queue=False) | |
m2m_submit_button.click(duplicate_m2m, [m2m_source_repos, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_prefix, m2m_suffix], | |
[m2m_output_md, m2m_output_image]) | |
m2m_clear_button.click(lambda: ("", HF_USER, "model", True, False, True, "", HF_REPO_PREFIX, HF_REPO_SUFFIX), None, | |
[m2m_search, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_source_repos, m2m_prefix, m2m_suffix], queue=False) | |
demo.queue() | |
demo.launch() |