import gradio as gr import requests from pathlib import Path import re import os import tempfile import shutil import urllib from huggingface_hub import whoami, HfApi, hf_hub_download, RepoCard from huggingface_hub.utils import build_hf_headers, hf_raise_for_status from gradio_huggingfacehub_search import HuggingfaceHubSearch ENDPOINT = "https://huggingface.co" # ENDPOINT = "http://localhost:5564" REPO_TYPES = ["model", "dataset", "space"] HF_REPO = os.environ.get("HF_REPO") if os.environ.get("HF_REPO") else "" # set your default repo HF_REPO_PREFIX = os.environ.get("HF_REPO_PREFIX") if os.environ.get("HF_REPO_PREFIX") else "" # set your default repo prefix HF_REPO_SUFFIX = os.environ.get("HF_REPO_SUFFIX") if os.environ.get("HF_REPO_SUFFIX") else "" # set your default repo suffix HF_USER = os.environ.get("HF_USER") if os.environ.get("HF_USER") else "" # set your username REGEX_HF_REPO = r'^[\w_\-\.]+/[\w_\-\.]+$' REGEX_HF_PATH = r'^[\w_\-\.]+/[\w_\-\.]+(/?:.+)?$' def is_valid_reponame(repo_id: str): return re.fullmatch(REGEX_HF_REPO, repo_id) def is_valid_path(hf_path: str): return re.fullmatch(REGEX_HF_PATH, hf_path) def create_repo(repo_id: str, repo_type: str, private: bool, hf_token: str): api = HfApi(token=hf_token) if repo_type == "space": api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, space_sdk="gradio", token=hf_token) else: api.create_repo(repo_id=repo_id, repo_type=repo_type, private=private, exist_ok=True, token=hf_token) def extract_src_reponame(source_repo: str): try: if is_valid_reponame(source_repo): target = "" else: source_repo, target = re.findall(r'^(?:http.+\.co/)?(?:datasets)?(?:spaces)?([\w_\-\.]+/[\w_\-\.]+)/?(?:blob/main/)?(?:resolve/main/)?(.+)?$', source_repo)[0] target = urllib.parse.unquote(target.removesuffix("/")) return source_repo, target except Exception as e: print(e) return source_repo, "" def extract_dst_reponame(dst_repo: str): try: if is_valid_reponame(dst_repo): subfolder = "" else: dst_repo, subfolder = re.findall(r'^([\w_\-\.]+/[\w_\-\.]+)/?(.+)?$', dst_repo)[0] subfolder = subfolder.removesuffix("/") return dst_repo, subfolder except Exception as e: print(e) return dst_repo, "" def remove_repo_tags(repo_id: str, tags: list[str], repo_type: str, hf_token: str): try: card = RepoCard.load(repo_id, repo_type=repo_type, token=hf_token) orig_content = card.content for tag in tags: if 'tags' in card.data and tag in card.data['tags']: card.data['tags'].remove(tag) if card.content == orig_content: return card.push_to_hub(repo_id=repo_id, repo_type=repo_type, token=hf_token) except Exception as e: print(f"Failed to remove tags from repocard. {e}") def duplicate(source_repo, dst_repo, repo_type, private, overwrite, auto_dir, remove_tag, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): hf_token = oauth_token.token api = HfApi(token=hf_token) try: if not repo_type in REPO_TYPES: raise ValueError("need to select valid repo type") _ = whoami(oauth_token.token) # ^ this will throw if token is invalid except Exception as e: raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}") try: source_repo, target = extract_src_reponame(source_repo) dst_repo, subfolder = extract_dst_reponame(dst_repo) if auto_dir: subfolder = source_repo if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") if overwrite or subfolder: temp_dir = tempfile.mkdtemp() create_repo(dst_repo, repo_type, private, hf_token) for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): if target and target not in path: continue file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) if not Path(file).exists(): continue if Path(file).is_dir(): # unused for now api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) elif Path(file).is_file(): api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=f"{subfolder}/{path}" if subfolder else path, repo_type=repo_type, token=hf_token) if Path(file).exists(): Path(file).unlink() if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" else: repo_url = f"https://huggingface.co/{dst_repo}" shutil.rmtree(temp_dir) else: r = requests.post( f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", headers=build_hf_headers(token=oauth_token.token), json={"repository": dst_repo, "private": private}, ) hf_raise_for_status(r) repo_url = r.json().get("url") if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) return ( f'Find your repo here', "sp.jpg", ) except Exception as e: print(e) raise gr.Error(f"Error occured: {e}") def parse_repos(s): repo_pattern = r'[^\w_\-\.]?([\w_\-\.]+/[\w_\-\.]+)[^\w_\-\.]?' try: s = re.sub("https?://[\\w/:%#\\$&\\?\\(\\)~\\.=\\+\\-]+", "", s) repos = re.findall(repo_pattern, s) return list(repos) except Exception: return [] def is_same_file_hf(src_repo: str, src_path: str, src_type: str, dst_repo: str, dst_path: str, dst_type: str, hf_token: str): api = HfApi(token=hf_token) if not api.file_exists(repo_id=src_repo, filename=src_path, repo_type=src_type, token=hf_token): return False if not api.file_exists(repo_id=dst_repo, filename=dst_path, repo_type=dst_type, token=hf_token): return False src_info = api.get_paths_info(repo_id=src_repo, paths=src_path, repo_type=src_type, token=hf_token) dst_info = api.get_paths_info(repo_id=dst_repo, paths=dst_path, repo_type=dst_type, token=hf_token) if not src_info or not dst_info or len(src_info) != 1 or len(dst_info) != 1 or src_info[0].lfs is None: return False if src_info[0].size == dst_info[0].size and src_info[0].lfs.sha256 == dst_info[0].lfs.sha256: return True return False def duplicate_m2o(source_repos_str, dst_repo, repo_type, private, overwrite, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): hf_token = oauth_token.token api = HfApi(token=hf_token) try: if not repo_type in REPO_TYPES: raise ValueError("need to select valid repo type") _ = whoami(oauth_token.token) # ^ this will throw if token is invalid except Exception as e: raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") if not is_valid_path(dst_repo): raise gr.Error(f"Invalid dst_repo: {dst_repo}") try: dst_repo, subfolder_prefix = extract_dst_reponame(dst_repo) if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): raise gr.Error(f"Repo already exists {dst_repo}") source_repos = parse_repos(source_repos_str) for source_repo in source_repos: source_repo, target = extract_src_reponame(source_repo) subfolder = subfolder_prefix + "/" + source_repo if subfolder_prefix else source_repo temp_dir = tempfile.mkdtemp() create_repo(dst_repo, repo_type, private, hf_token) for path in api.list_repo_files(repo_id=source_repo, repo_type=repo_type, token=hf_token): if target and target not in path: continue path_in_repo = f"{subfolder}/{path}" if subfolder else path if is_same_file_hf(source_repo, path, repo_type, dst_repo, path_in_repo, repo_type, hf_token): print(f"{dst_repo}/{path_in_repo} is already exists. Skipping.") continue file = hf_hub_download(repo_id=source_repo, filename=path, repo_type=repo_type, local_dir=temp_dir, token=hf_token) if not Path(file).exists(): continue if Path(file).is_dir(): # unused for now api.upload_folder(repo_id=dst_repo, folder_path=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token) elif Path(file).is_file(): api.upload_file(repo_id=dst_repo, path_or_fileobj=file, path_in_repo=path_in_repo, repo_type=repo_type, token=hf_token) if Path(file).exists(): Path(file).unlink() if repo_type == "dataset": repo_url = f"https://huggingface.co/datasets/{dst_repo}" elif repo_type == "space": repo_url = f"https://huggingface.co/spaces/{dst_repo}" else: repo_url = f"https://huggingface.co/{dst_repo}" shutil.rmtree(temp_dir) return ( f'Find your repo here', "sp.jpg", ) except Exception as e: print(e) raise gr.Error(f"Error occured: {e}") def duplicate_m2m(source_repos_str, hf_user, repo_type, private, overwrite, remove_tag, repo_prefix, repo_suffix, oauth_token: gr.OAuthToken | None, progress=gr.Progress(track_tqdm=True)): hf_token = oauth_token.token api = HfApi(token=hf_token) try: if not repo_type in REPO_TYPES: raise ValueError("need to select valid repo type") _ = whoami(oauth_token.token) # ^ this will throw if token is invalid except Exception as e: raise gr.Error(f"""Oops, you forgot to login. Please use the loggin button on the top left to migrate your repo {e}""") try: source_repos = parse_repos(source_repos_str) repo_url_result = 'Find your repo ' for source_repo in source_repos: if not is_valid_reponame(source_repo) or not api.repo_exists(repo_id=source_repo, repo_type=repo_type, token=hf_token): continue dst_repo = hf_user + "/" + repo_prefix + source_repo.split("/")[-1] + repo_suffix if not is_valid_reponame(dst_repo): continue if not overwrite and api.repo_exists(repo_id=dst_repo, repo_type=repo_type, token=hf_token): gr.Info(f"Repo already exists {dst_repo}") continue r = requests.post( f"{ENDPOINT}/api/{repo_type}s/{source_repo}/duplicate", headers=build_hf_headers(token=oauth_token.token), json={"repository": dst_repo, "private": private}, ) hf_raise_for_status(r) repo_url = r.json().get("url") repo_url_result += f'{dst_repo}
\n' if remove_tag: remove_repo_tags(dst_repo, ["not-for-all-audiences"], repo_type, hf_token) return ( repo_url_result, "sp.jpg", ) except Exception as e: print(e) raise gr.Error(f"Error occured: {e}") def add_repo_text(repo_id: str, source_repos: str): return source_repos + "\n" + repo_id if source_repos else repo_id def swap_visibilty(profile: gr.OAuthProfile | None): return gr.update(elem_classes=["main_ui_logged_in"]) if profile else gr.update(elem_classes=["main_ui_logged_out"]) css = ''' .main_ui_logged_out{opacity: 0.3; pointer-events: none} .title {text-align: center; align-items: center} ''' with gr.Blocks(css=css) as demo: gr.LoginButton() with gr.Column(elem_classes="main_ui_logged_out") as main_ui: gr.Markdown("# Duplicate your repo!", elem_classes="title") gr.Markdown("Duplicate a Hugging Face repository! This Space is a an experimental demo.") with gr.Tab("One to One"): with gr.Row(): with gr.Column(): search = HuggingfaceHubSearch( label="source_repo", placeholder="Source repository (e.g. osanseviero/src)", search_type=["model", "dataset", "space"], sumbit_on_select=False, ) with gr.Group(): dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") with gr.Row(): is_private = gr.Checkbox(label="Make new repo private?", value=True) is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False) is_subdir = gr.Checkbox(label="Create subdirectories automatically?", value=False) is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=False) with gr.Row(): submit_button = gr.Button("Submit", variant="primary") clear_button = gr.Button("Clear", variant="secondary") with gr.Column(): output_md = gr.Markdown(label="output") output_image = gr.Image(show_label=False) with gr.Tab("Multi to One"): with gr.Row(): with gr.Column(): m2o_search = HuggingfaceHubSearch( label="source_repo", placeholder="Source repository (e.g. osanseviero/src)", search_type=["model", "dataset", "space"], sumbit_on_select=True, ) m2o_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) with gr.Group(): m2o_dst_repo = gr.Textbox(label="dst_repo", placeholder="Destination repository (e.g. osanseviero/dst)", value=HF_REPO) m2o_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") with gr.Row(): m2o_is_private = gr.Checkbox(label="Make new repo private?", value=True) m2o_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=True) with gr.Row(): m2o_submit_button = gr.Button("Submit", variant="primary") m2o_clear_button = gr.Button("Clear", variant="secondary") with gr.Column(): m2o_output_md = gr.Markdown(label="output") m2o_output_image = gr.Image(show_label=False) with gr.Tab("Multi to Multi"): with gr.Row(): with gr.Column(): m2m_search = HuggingfaceHubSearch( label="source_repo", placeholder="Source repository (e.g. osanseviero/src)", search_type=["model", "dataset", "space"], sumbit_on_select=True, ) m2m_source_repos = gr.Textbox(label="source_repos", placeholder="Source repositories (e.g. osanseviero/src)\n...", value="", lines=10) with gr.Group(): with gr.Row(): m2m_user = gr.Textbox(label="hf_user", placeholder="Your HF username", value=HF_USER) m2m_prefix = gr.Textbox(label="repo_prefix", value=HF_REPO_PREFIX) m2m_suffix = gr.Textbox(label="repo_suffix", value=HF_REPO_SUFFIX) m2m_repo_type = gr.Dropdown(label="repo_type", choices=REPO_TYPES, value="model") with gr.Row(): m2m_is_private = gr.Checkbox(label="Make new repo private?", value=True) m2m_is_overwrite = gr.Checkbox(label="Overwrite existing repo?", value=False) m2m_is_remtag = gr.Checkbox(label="Remove NFAA tag?", info="To avoid Inference API bug", value=True) with gr.Row(): m2m_submit_button = gr.Button("Submit", variant="primary") m2m_clear_button = gr.Button("Clear", variant="secondary") with gr.Column(): m2m_output_md = gr.Markdown(label="output") m2m_output_image = gr.Image(show_label=False) demo.load(fn=swap_visibilty, outputs=main_ui) submit_button.click(duplicate, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], [output_md, output_image]) clear_button.click(lambda: ("", HF_REPO, "model", True, True, True, True), None, [search, dst_repo, repo_type, is_private, is_overwrite, is_subdir, is_remtag], queue=False) m2o_search.submit(add_repo_text, [m2o_search, m2o_source_repos], [m2o_source_repos], queue=False) m2o_submit_button.click(duplicate_m2o, [m2o_source_repos, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite], [m2o_output_md, m2o_output_image]) m2o_clear_button.click(lambda: ("", HF_REPO, "model", True, True, ""), None, [m2o_search, m2o_dst_repo, m2o_repo_type, m2o_is_private, m2o_is_overwrite, m2o_source_repos], queue=False) m2m_search.submit(add_repo_text, [m2m_search, m2m_source_repos], [m2m_source_repos], queue=False) m2m_submit_button.click(duplicate_m2m, [m2m_source_repos, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_prefix, m2m_suffix], [m2m_output_md, m2m_output_image]) m2m_clear_button.click(lambda: ("", HF_USER, "model", True, False, True, "", HF_REPO_PREFIX, HF_REPO_SUFFIX), None, [m2m_search, m2m_user, m2m_repo_type, m2m_is_private, m2m_is_overwrite, m2m_is_remtag, m2m_source_repos, m2m_prefix, m2m_suffix], queue=False) demo.queue() demo.launch()