import os from pathlib import Path from typing import Literal, Optional from fastapi import BackgroundTasks, FastAPI, Header, HTTPException from fastapi.responses import FileResponse from huggingface_hub import ( CommitOperationAdd, CommitOperationDelete, comment_discussion, create_commit, create_repo, delete_repo, snapshot_download, ) from pydantic import BaseModel from requests import HTTPError WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") HF_TOKEN = os.getenv("HF_TOKEN") class WebhookPayloadEvent(BaseModel): action: Literal["create", "update", "delete"] scope: str class WebhookPayloadRepo(BaseModel): type: Literal["dataset", "model", "space"] name: str private: bool class WebhookPayloadDiscussion(BaseModel): num: int isPullRequest: bool class WebhookPayload(BaseModel): event: WebhookPayloadEvent repo: WebhookPayloadRepo discussion: Optional[WebhookPayloadDiscussion] app = FastAPI() @app.get("/") async def home(): return FileResponse("home.html") @app.post("/webhook") async def post_webhook( payload: WebhookPayload, task_queue: BackgroundTasks, x_webhook_secret: Optional[str] = Header(default=None), ): # Taken from https://huggingface.co/spaces/huggingface-projects/auto-retrain if x_webhook_secret is None: raise HTTPException(401) if x_webhook_secret != WEBHOOK_SECRET: raise HTTPException(403) if payload.repo.type != "space": raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") if not payload.event.scope.startswith("discussion"): return "Not a discussion" if payload.discussion is None: return "Couldn't parse 'payload.discussion'" if not payload.discussion.isPullRequest: return "Not a Pull Request" if payload.event.action == "create" or payload.event.action == "update": task_queue.add_task( sync_ci_space, space_id=payload.repo.name, pr_num=payload.discussion.num, private=payload.repo.private, ) elif payload.event.action == "delete": task_queue.add_task( delete_ci_space, space_id=payload.repo.name, pr_num=payload.discussion.num, ) else: return f"Couldn't handle action {payload.event.action}" return "Processed" def sync_ci_space(space_id: str, pr_num: int, private: bool) -> None: # Create a temporary space for CI if didn't exist ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) try: create_repo( ci_space_id, repo_type="space", space_sdk="docker", private=private, token=HF_TOKEN, ) is_new = True except HTTPError as err: if err.response.status_code == 409: # already exists is_new = False else: raise # Download space codebase from PR revision snapshot_path = snapshot_download( repo_id=space_id, revision=f"refs/pr/{pr_num}", repo_type="space", token=HF_TOKEN, ) # Sync space codebase with PR revision operations = [ # little aggressive but works CommitOperationDelete(".", is_folder=True) ] operations += [ CommitOperationAdd( path_in_repo=str(filepath.relative_to(snapshot_path)), path_or_fileobj=filepath, ) for filepath in Path(snapshot_path).glob("**/*") if filepath.is_file() ] create_commit( repo_id=ci_space_id, repo_type="space", operations=operations, commit_message=f"Sync CI Space with PR {pr_num}.", token=HF_TOKEN, ) # Post a comment on the PR notify_pr(space_id=space_id, pr_num=pr_num, action="create" if is_new else "update") def delete_ci_space(space_id: str, pr_num: int) -> None: # Delete ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) delete_repo(repo_id=ci_space_id, repo_type="space", token=HF_TOKEN) # Notify about deletion notify_pr(space_id=space_id, pr_num=pr_num, action="delete") def notify_pr( space_id: str, pr_num: int, action: Literal["create", "update", "delete"] ) -> None: ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) if action == "create": comment = NOTIFICATION_TEMPLATE_CREATE.format(ci_space_id=ci_space_id) elif action == "update": comment = NOTIFICATION_TEMPLATE_UPDATE.format(ci_space_id=ci_space_id) elif action == "delete": comment = NOTIFICATION_TEMPLATE_DELETE else: raise ValueError(f"Status {action} not handled.") comment_discussion( repo_id=space_id, repo_type="space", discussion_num=pr_num, comment=comment, token=HF_TOKEN, ) def _get_ci_space_id(space_id: str, pr_num: int) -> str: return f"{space_id}-ci-pr-{pr_num}" NOTIFICATION_TEMPLATE_CREATE = """\ Hey there! Following the creation of this PR, a temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been launched. Any changes pushed to this PR will be synced with the test Space. (This is an automated message) """ NOTIFICATION_TEMPLATE_UPDATE = """\ Hey there! Following new commits that happened in this PR, the temporary test Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated. (This is an automated message) """ NOTIFICATION_TEMPLATE_DELETE = """\ Hey there! PR is now merged. The temporary test Space has been deleted. (This is an automated message) """