import gradio as gr import os from git_commd import GitCommandWrapper from typing import List, Optional import logging HF_TOKEN = os.environ["HF_TOKEN"] if "HF_TOKEN" in os.environ else "" WiseModel_TOKEN = os.environ["WM_TOKEN"] if "WM_TOKEN" in os.environ else "" GIT_USER = os.environ["GIT_USER"] if "GIT_USER" in os.environ else "" GIT_EMAIL = os.environ["GIT_EMAIL"] if "GIT_EMAIL" in os.environ else "" def get_cache_dir(): from random_word import RandomWords r = RandomWords() return r.get_random_word() def check_disk(): import os return os.system("df -h /") def init_git_user(username: str, email: str): if not username or not email: raise gr.Error( "Please config your git username and email in the environment variables." ) os.system(f"git config --global user.email {email}") os.system(f"git config --global user.name {username}") def pull_from_wisemodel( token: str, url: str, repo_name: str, cache_dir, branch: Optional[str] = None ): print("pull_from_wisemodel start") print(cache_dir) os.makedirs(cache_dir, exist_ok=True) # os.system("cd "+cache_dir) gitCmd = GitCommandWrapper() isLfs = gitCmd.is_lfs_installed() if not isLfs: gitCmd.git_lfs_install() gitCmd.clone(cache_dir, token, url, repo_name, branch) print("pull_from_wisemodel end") return f"Pulled {branch} to temp folder {cache_dir}: {url}" def push_to_wiseModel( token: str, url: str, repo_name: str, cache_dir: str, hf_repo_id: str, branch: Optional[str] = None, ): print("push_to_wiseModel start") gitCmd = GitCommandWrapper() isLfs = gitCmd.is_lfs_installed() if not isLfs: gitCmd.git_lfs_install() gitCmd.clone(cache_dir, token, url, repo_name, branch) repo_dir = f"./{cache_dir}/{repo_name}" os.makedirs(repo_dir, exist_ok=True) source_dir = f"./{cache_dir}/hf/{hf_repo_id}" move_file(source_dir, repo_dir) #, excludes=[".git", ".gitattributes"]) gitCmd.add(repo_dir, all_files=True) gitCmd.commit(repo_dir, message="commit from hf to wisemodel") gitCmd.push(repo_dir, token, url, branch, branch) print("push_to_wiseModel end") return f"Pushed {branch} to {url}" def move_file(source: str, destination: str, excludes: list[str] = []): import shutil try: # move all files in the source directory to the destination directory # list all files in the source directory files = os.listdir(source) for file in files: if file in excludes: continue # if file already exists in the destination directory, remove it if os.path.exists(f"{destination}/{file}"): logging.info(f"Removing {file} from {destination}") # incase is a directory, remove it recursively if os.path.isdir(f"{destination}/{file}"): # force remove the directory and all its contents shutil.rmtree(f"{destination}/{file}", ignore_errors=True) else: os.remove(f"{destination}/{file}") logging.info(f"Moving {file} to {destination}") # move each file to destination Directory, if it already exists, it will be replaced shutil.move(f"{source}/{file}", destination) except Exception as e: logging.exception(e) return f"Error moving files from {source} to {destination}, {e.args[0]}" return "file moved" def remove_file(cache_dir, repo_name): import os try: os.remove(f"{cache_dir}/{repo_name}") except: return "" return "README.md file removed" def push_to_hf(cache_dir, WiseModel_repo_name, hf_repo_id): from huggingface_hub import HfApi if not HF_TOKEN: raise gr.Error("Please enter your HF_TOKEN") print("push_to_hf start") api = HfApi(token=HF_TOKEN) # Token is not persisted on the machine. output = api.upload_folder( folder_path=f"{cache_dir}/{WiseModel_repo_name}", repo_id=hf_repo_id, repo_type="model", ) print("push_to_hf end") return f"Pushed to {hf_repo_id}" def pull_from_hf(cache_dir, hf_repo_id): from huggingface_hub import HfApi # if not HF_TOKEN: # raise gr.Error("Please enter your HF_TOKEN") print("pull_from_hf start") api = HfApi(token=HF_TOKEN) # Token is not persisted on the machine. output = api.snapshot_download( repo_id=hf_repo_id, repo_type="model", local_dir=cache_dir + "/hf/" + hf_repo_id, local_dir_use_symlinks=False, ) print(f"pull_from_hf end, output: {output}") return f"Pulled from {hf_repo_id}" def handle(wisemodel_link, hf_repo_id): cache_dir = get_cache_dir() wiseModel_repo_url = ( wisemodel_link.replace(".git", "") .replace("git", "") .replace("clone", "") .replace(" ", "") ) wiseModel_repo_info = ( wisemodel_link.replace(".git", "") .replace("git", "") .replace("clone", "") .replace(" ", "") .split("/") ) print(wiseModel_repo_info) wisemodel_repo_name = wiseModel_repo_info[-1] stages = [ (check_disk, (), {}), # # Run all the sanity checks on README.md # (pull_from_wisemodel, (WiseModel_TOKEN,wiseModel_repo_url,wisemodel_repo_name, cache_dir,"main"), {}), # (remove_file, (wisemodel_repo_name, cache_dir), {}), # (check_disk, (), {}), # (push_to_hf, (cache_dir, wisemodel_repo_name, hf_repo_id), {}), # (check_disk, (), {}), (init_git_user, (GIT_USER, GIT_EMAIL), {}), (pull_from_hf, (cache_dir, hf_repo_id), {}), (remove_file, (hf_repo_id, cache_dir), {}), (check_disk, (), {}), ( push_to_wiseModel, ( WiseModel_TOKEN, wiseModel_repo_url, wisemodel_repo_name, cache_dir, hf_repo_id, "main", ), {}, ), (check_disk, (), {}), ] results = [] errors = [] for func, args, kwargs in stages: try: results.append(str(func(*args, **kwargs))) except Exception as e: logging.exception(e) errors.append(str(e)) if errors: break return "\n\n".join(results), "\n\n".join(errors) with gr.Blocks() as demo: gr.Markdown( """ # HF-to-wisemodel/从Huggingface上拉取模型、数据集等到wisemodel # 这是一个示例Space,实际使用请参考下面说明,先Duplicate一个私有的space - 这个space可以把已经发布在Huggingface上的模型拉取到wisemodel上。 - This space uploads model from Huggingface to wisemodel. - **请确认您是repo的拥有者或者有权限操作!** - **Please make sure that you're the owner of the repo or have permission from the owner to do so!** # 如何使用这个空间? # How to use this Space? - 点击右上角settings后面的“…”按钮,选择“Duplicate this Space”创建一个私有的space,同时输入wisemodel的token(必填)、wisemodel的用户名(GIT_USER 必填)、wisemodel的注册邮箱(GIT_EMAIL 必填),确保有相应repo写入的权限。你还需要输入你的git用户名和邮箱。 - Duplicate this Space and providing WiseModel token (mandatory) and your read/write HF token (mandatory).you also need to provide your git username and email to push the model to WiseModel. - 在wiseModel上创建一个空的repo,这一步需要手动完成,Space不会为您创建一个空的repo。 - Create your target model repo on WiseModel. This step is not automated. - 在刚刚自己创建的私有space里填写相应的信息,wisemodel的git clone链接,以及Huggingface的repo名称。 - In your own private Space, fill in information below. - 点击submit按钮,然后可以通过logs按钮查看进度。 - Click submit then watch for output in container log for progress. """ ) wisemodel_link = gr.Textbox( label="Copy the git download link from the model detail page of wisemodel(从wisemodel上获取该模型的完整git clone链接) " ) hf_repo_id = gr.Textbox( label="Source HF Model Repo ID (case sensitive). \nPlease make sure that this model has already been created" ) with gr.Row(): button = gr.Button("Submit", variant="primary") clear = gr.Button("Clear") error = gr.Textbox(label="Error") output = gr.Textbox(label="Output") button.click(handle, [wisemodel_link, hf_repo_id], [output, error]) if __name__ == "__main__": demo.launch(debug=True)