import os from pathlib import Path from typing import Literal from fastapi import BackgroundTasks, HTTPException, Response, status from huggingface_hub import ( CommitOperationAdd, CommitOperationDelete, comment_discussion, create_commit, create_repo, delete_repo, get_repo_discussions, snapshot_download, space_info, ) from huggingface_hub.repocard import RepoCard from requests import HTTPError from huggingface_hub import login from huggingface_hub import WebhooksServer, WebhookPayload from huggingface_hub.utils import RepositoryNotFoundError from ui import generate_ui login(token=os.getenv("HF_TOKEN")) CI_BOT_NAME = "spaces-ci-bot" app = WebhooksServer(ui=generate_ui()) @app.add_webhook async def trigger_ci_on_pr(payload: WebhookPayload, task_queue: BackgroundTasks): if payload.repo.type != "space": raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") space_id = payload.repo.name has_task = False if ( # Means "a new PR has been opened" payload.event.scope.startswith("discussion") and payload.event.action == "create" and payload.discussion is not None and payload.discussion.isPullRequest and payload.discussion.status == "open" ): if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num): # New PR! Sync task scheduled task_queue.add_task( sync_ci_space, space_id=space_id, pr_num=payload.discussion.num, private=payload.repo.private, ) has_task = True elif ( # Means "a PR has been merged or closed" payload.event.scope.startswith("discussion") and payload.event.action == "update" and payload.discussion is not None and payload.discussion.isPullRequest and ( payload.discussion.status == "merged" or payload.discussion.status == "closed" ) ): task_queue.add_task( delete_ci_space, space_id=space_id, pr_num=payload.discussion.num, ) has_task = True elif ( # Means "some content has been pushed to the Space" (any branch) payload.event.scope.startswith("repo.content") and payload.event.action == "update" ): # New repo change. Is it a commit on a PR? # => loop through all PRs and check if new changes happened for discussion in get_repo_discussions(repo_id=space_id, repo_type="space"): if discussion.is_pull_request and discussion.status == "open": if not is_pr_synced(space_id=space_id, pr_num=discussion.num): # Found a PR that is not yet synced task_queue.add_task( sync_ci_space, space_id=space_id, pr_num=discussion.num, private=payload.repo.private, ) has_task = True if has_task: return Response( "Task scheduled to sync/delete Space", status_code=status.HTTP_202_ACCEPTED ) else: return Response("No task scheduled", status_code=status.HTTP_200_OK) def is_pr_synced(space_id: str, pr_num: int) -> bool: # What is the last synced commit for this PR? ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) try: card = RepoCard.load(repo_id_or_path=ci_space_id, repo_type="space") last_synced_sha = getattr(card.data, "synced_sha", None) except HTTPError: last_synced_sha = None # What is the last commit id for this PR? info = space_info(repo_id=space_id, revision=f"refs/pr/{pr_num}") last_pr_sha = info.sha # Is it up to date ? return last_synced_sha == last_pr_sha def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None: print(f"New task: sync ephemeral env for {space_id} (PR {pr_num})") # Create a temporary space for CI if didn't exist ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) try: create_repo( ci_space_id, repo_type="space", space_sdk="docker", # Will be overwritten by sync private=private, ) is_new = True except HTTPError as err: if err.response.status_code == 409: # already exists is_new = False else: raise # Download space codebase from PR revision snapshot_path = Path( snapshot_download( repo_id=space_id, revision=f"refs/pr/{pr_num}", repo_type="space" ) ) # Sync space codebase with PR revision operations = [ # little aggressive but works CommitOperationDelete(".", is_folder=True) ] for filepath in snapshot_path.glob("**/*"): if filepath.is_file(): path_in_repo = str(filepath.relative_to(snapshot_path)) # Upload all files without changes except for the README file if path_in_repo == "README.md": card = RepoCard.load(filepath) setattr(card.data, "synced_sha", snapshot_path.name) # latest sha path_or_fileobj = str(card).encode() else: path_or_fileobj = filepath operations.append( CommitOperationAdd( path_in_repo=path_in_repo, path_or_fileobj=path_or_fileobj ) ) create_commit( repo_id=ci_space_id, repo_type="space", operations=operations, commit_message=f"Sync CI Space with PR {pr_num}.", ) # Post a comment on the PR notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update") def delete_ci_space(space_id: str, pr_num: int) -> None: print(f"New task: delete ephemeral env for {space_id} (PR {pr_num})") # Delete ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) try: delete_repo(repo_id=ci_space_id, repo_type="space") except RepositoryNotFoundError: # Repo did not exist: no need to notify return # Notify about deletion notify_pr(space_id=space_id, pr_num=pr_num, action="delete") def notify_pr( space_id: str, pr_num: int, action: Literal["create", "update", "delete"] ) -> None: ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) if action == "create": comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id) elif action == "update": comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id) elif action == "delete": comment = NOTIFICATION_TEMPLATE_DELETE else: raise ValueError(f"Status {action} not handled.") comment_discussion( repo_id=space_id, repo_type="space", discussion_num=pr_num, comment=comment ) def _get_ci_space_id(space_id: str, pr_num: int) -> str: return f"{CI_BOT_NAME}/{space_id.replace('/', '-')}-ci-pr-{pr_num}" NOTIFICATION_TEMPLATE_CREATE = """\ Following the creation of this PR, an ephemeral Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been launched. Any changes pushed to this PR will be synced with the test Space. If your Space requires configuration (secrets or upgraded hardware), you must duplicate the ephemeral Space to your account and configure the settings by yourself. You are responsible of making sure that the changes introduced in the PR are not harmful (leak secrets, run malicious scripts,...). _(This is an automated message.)_ """ NOTIFICATION_TEMPLATE_UPDATE = """\ Following new commits that happened in this PR, the ephemeral Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated. _(This is an automated message.)_ """ NOTIFICATION_TEMPLATE_DELETE = """\ PR is now merged/closed. The ephemeral Space has been deleted. _(This is an automated message.)_ """ app.run()