Spaces:
Build error
Build error
| import os | |
| import uuid | |
| import warnings | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import wraps | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Literal, Optional, TypedDict, Union | |
| import gradio as gr | |
| from fastapi import BackgroundTasks, HTTPException, Response, status | |
| from huggingface_hub import ( | |
| SpaceHardware, | |
| SpaceStorage, | |
| WebhookPayload, | |
| WebhooksServer, | |
| add_space_secret, | |
| add_space_variable, | |
| comment_discussion, | |
| create_repo, | |
| delete_repo, | |
| get_discussion_details, | |
| get_repo_discussions, | |
| get_space_runtime, | |
| get_space_variables, | |
| repo_exists, | |
| request_space_hardware, | |
| request_space_storage, | |
| snapshot_download, | |
| space_info, | |
| upload_folder, | |
| ) | |
| from huggingface_hub.repocard import RepoCard | |
| from huggingface_hub.utils import ( | |
| RepositoryNotFoundError, | |
| build_hf_headers, | |
| get_session, | |
| hf_raise_for_status, | |
| ) | |
| from requests import HTTPError | |
| SPACE_ID = os.environ.get("SPACE_ID") | |
| IS_EPHEMERAL_SPACE = SPACE_ID is not None and "-ci-pr-" in SPACE_ID | |
| WEBHOOK_SECRET = os.environ.get("SPACE_CI_SECRET") | |
| if SPACE_ID is not None: # If running in a Space (i.e. not locally) | |
| if WEBHOOK_SECRET is None: # No secret set yet => generate one => restart space | |
| WEBHOOK_SECRET = str(uuid.uuid4()) | |
| add_space_secret( | |
| repo_id=SPACE_ID, | |
| key="SPACE_CI_SECRET", | |
| value=WEBHOOK_SECRET, | |
| description="This value is used by the SpaceCI. It is automatically generated and should not be changed.", | |
| ) | |
| EPHEMERAL_SPACES_CONFIG: Dict[str, Any] = {} | |
| def enable_space_ci() -> None: | |
| """Enable Space CI for the current Space based on config from the README.md file. | |
| Example: | |
| ```py | |
| import gradio as gr | |
| from gradio_space_ci import enable_space_ci | |
| enable_space_ci() | |
| with gr.Blocks() as demo: | |
| ... | |
| demo.launch() | |
| ``` | |
| """ | |
| if SPACE_ID is None: | |
| print("Not in a Space: Space CI disabled.") | |
| return | |
| if IS_EPHEMERAL_SPACE: | |
| print("In an ephemeral Space: Space CI disabled.") | |
| return | |
| card = RepoCard.load(repo_id_or_path=SPACE_ID, repo_type="space") | |
| config = card.data.get("space_ci", {}) | |
| print(f"Enabling Space CI with config from README: {config}") | |
| old_launch = gr.Blocks.launch | |
| def new_launch(self: gr.Blocks, *args, **kwargs) -> None: | |
| server = configure_space_ci( | |
| blocks=self, | |
| trusted_authors=config.get("trusted_authors"), | |
| private=config.get("private", "auto"), | |
| variables=config.get("variables", "auto"), | |
| secrets=config.get("secrets"), | |
| hardware=config.get("hardware"), | |
| storage=config.get("storage"), | |
| ) | |
| # De-monkey patch gradio (otherwise it will be called recursively) | |
| gr.Blocks.launch = old_launch | |
| server.launch(*args, *kwargs) | |
| # Monkey patch gradio | |
| gr.Blocks.launch = new_launch | |
| def configure_space_ci( | |
| blocks: Optional["gr.Blocks"] = None, | |
| trusted_authors: Optional[List[str]] = None, | |
| private: Union[bool, Literal["auto"]] = "auto", | |
| variables: Union[Dict[str, str], Literal["auto"]] = "auto", | |
| secrets: Optional[List[str]] = None, | |
| hardware: Union[SpaceHardware, Literal["auto"], None] = None, | |
| storage: Union[SpaceStorage, Literal["auto"], None] = None, | |
| ) -> WebhooksServer: | |
| if SPACE_ID is None or IS_EPHEMERAL_SPACE: | |
| # Runs locally => don't configure webhook | |
| # Runs in an ephemeral Space => don't configure webhook | |
| return WebhooksServer(ui=blocks) | |
| # Authors | |
| trusted_authors = trusted_authors or [] | |
| namespace = SPACE_ID.split("/")[0] | |
| try: # Check if namespace is an organization => in this case all members are allowed to trigger CI by default | |
| response = get_session().get( | |
| f"https://huggingface.co/api/organizations/{namespace}/members", headers=build_hf_headers() | |
| ) | |
| response.raise_for_status() | |
| trusted_authors += [user["user"] for user in response.json()] | |
| except Exception: # Otherwise, it's a single user => only this user is allowed to trigger CI by default | |
| trusted_authors += [namespace] | |
| trusted_authors = sorted(set(trusted_authors)) | |
| EPHEMERAL_SPACES_CONFIG["trusted_authors"] = trusted_authors | |
| # Private | |
| if private == "auto": | |
| private = space_info(SPACE_ID).private | |
| EPHEMERAL_SPACES_CONFIG["private"] = private | |
| # Variables | |
| if variables == "auto": | |
| variables = {value.key: value.value for value in get_space_variables(SPACE_ID).values()} | |
| EPHEMERAL_SPACES_CONFIG["variables"] = variables | |
| # Secrets | |
| secrets_with_values: Dict[str, str] = {} | |
| if secrets is not None: | |
| for secret in secrets: | |
| secret_value = os.environ.get(secret) | |
| if secret_value is None: | |
| warnings.warn(f"Secret {secret} not found in environment variables. Will skip it in ephemeral Space.") | |
| continue | |
| secrets_with_values[secret] = secret_value | |
| EPHEMERAL_SPACES_CONFIG["secrets"] = secrets_with_values | |
| # Hardware and storage | |
| if hardware == "auto" or storage == "auto": | |
| runtime = get_space_runtime(SPACE_ID) | |
| if hardware == "auto": | |
| hardware = runtime.hardware | |
| if storage == "auto": | |
| storage = runtime.storage | |
| EPHEMERAL_SPACES_CONFIG["hardware"] = hardware | |
| EPHEMERAL_SPACES_CONFIG["storage"] = storage | |
| # Summary | |
| print( | |
| "Ephemeral Spaces config:" | |
| f"\n - trusted authors: {trusted_authors}" | |
| f"\n - private: {private}" | |
| f"\n - secrets: {', '.join(sorted(secrets_with_values.keys()))}" | |
| f"\n - variables: {variables}" | |
| f"\n - storage: {storage}" | |
| f"\n - hardware: {hardware}" | |
| ) | |
| # Configure webhook | |
| server = WebhooksServer(ui=blocks, webhook_secret=WEBHOOK_SECRET) | |
| server.add_webhook()(trigger_ci_on_pr) | |
| configure_webhook_on_hub() | |
| # Recover missed webhooks | |
| recover_after_restart(space_id=SPACE_ID) | |
| return server | |
| ### | |
| # Recovery logic | |
| ### | |
| # Check if there are any PRs that need to be synced. | |
| # We might have missed some events while the server was down. | |
| # => called once at startup (see configure_space_ci) | |
| background_pool = ThreadPoolExecutor(max_workers=1) | |
| def recover_after_restart(space_id: str) -> None: | |
| print("Looping through PRs to check if any needs to be synced.") | |
| for discussion in get_repo_discussions(repo_id=space_id, repo_type="space"): | |
| if discussion.is_pull_request: | |
| if discussion.status == "open": | |
| if not is_pr_synced(space_id=space_id, pr_num=discussion.num): | |
| # Found a PR that is not yet synced | |
| print(f"Recovery. Found an open PR that is not synced: {discussion.url}. Syncing it.") | |
| background_pool.submit(sync_ci_space, space_id=space_id, pr_num=discussion.num) | |
| if discussion.status == "merged" or discussion.status == "closed": | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=discussion.num) | |
| if repo_exists(repo_id=ci_space_id, repo_type="space"): | |
| # Found a PR for which the CI space has not been deleted | |
| print(f"Recovery. Found a closed PR with an active CI space: {discussion.url}. Deleting it.") | |
| background_pool.submit(delete_ci_space, space_id=space_id, pr_num=discussion.num) | |
| ### | |
| # Define webhook on the Hub logic | |
| ### | |
| def configure_webhook_on_hub(): | |
| url = "https://" + os.environ.get("SPACE_HOST").strip("/") + "/webhooks/trigger_ci_on_pr" | |
| # Check if webhook already exists | |
| webhooks = list_webhooks() | |
| for webhook in webhooks: | |
| if webhook["url"] == url: | |
| print("Webhook already configured") | |
| return | |
| # If not => create it | |
| create_webhook( | |
| watched=[{"type": "space", "name": SPACE_ID}], url=url, domains=["repo", "discussion"], secret=WEBHOOK_SECRET | |
| ) | |
| print("New webhook already configured!") | |
| ### | |
| # Webhook logic | |
| ### | |
| async def trigger_ci_on_pr(payload: WebhookPayload, task_queue: BackgroundTasks): | |
| if payload.repo.type != "space": | |
| raise HTTPException(400, f"Must be a Space, not {payload.repo.type}") | |
| space_id = payload.repo.name | |
| has_task = False | |
| if ( | |
| # Means "a new PR has been opened" | |
| payload.event.scope.startswith("discussion") | |
| and payload.event.action == "create" | |
| and payload.discussion is not None | |
| and payload.discussion.isPullRequest | |
| and payload.discussion.status == "open" | |
| ): | |
| if not is_pr_synced(space_id=space_id, pr_num=payload.discussion.num): | |
| # New PR! Sync task scheduled | |
| task_queue.add_task(sync_ci_space, space_id=space_id, pr_num=payload.discussion.num) | |
| has_task = True | |
| elif ( | |
| # Means "a PR has been merged or closed" | |
| payload.event.scope.startswith("discussion") | |
| and payload.event.action == "update" | |
| and payload.discussion is not None | |
| and payload.discussion.isPullRequest | |
| and (payload.discussion.status == "merged" or payload.discussion.status == "closed") | |
| ): | |
| task_queue.add_task( | |
| delete_ci_space, | |
| space_id=space_id, | |
| pr_num=payload.discussion.num, | |
| ) | |
| has_task = True | |
| elif ( | |
| # Means "some content has been pushed to the Space" (any branch) | |
| payload.event.scope.startswith("repo.content") and payload.event.action == "update" | |
| ): | |
| # New repo change. Is it a commit on a PR? | |
| # => loop through all PRs and check if new changes happened | |
| for discussion in get_repo_discussions(repo_id=space_id, repo_type="space"): | |
| if discussion.is_pull_request and discussion.status == "open": | |
| if not is_pr_synced(space_id=space_id, pr_num=discussion.num): | |
| # Found a PR that is not yet synced | |
| task_queue.add_task(sync_ci_space, space_id=space_id, pr_num=discussion.num) | |
| has_task = True | |
| if has_task: | |
| return Response("Task scheduled to sync/delete Space", status_code=status.HTTP_202_ACCEPTED) | |
| else: | |
| return Response("No task scheduled", status_code=status.HTTP_200_OK) | |
| ### | |
| # Internal logic | |
| ### | |
| def is_pr_synced(space_id: str, pr_num: int) -> bool: | |
| # What is the last synced commit for this PR? | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| try: | |
| card = RepoCard.load(repo_id_or_path=ci_space_id, repo_type="space") | |
| last_synced_sha = getattr(card.data, "synced_sha", None) | |
| except HTTPError: | |
| return False | |
| # What is the last commit id for this PR? | |
| info = space_info(repo_id=space_id, revision=f"refs/pr/{pr_num}") | |
| last_pr_sha = info.sha | |
| # Is it up to date ? | |
| return last_synced_sha == last_pr_sha | |
| def sync_ci_space(space_id: str, pr_num: int) -> None: | |
| print(f"New task: sync ephemeral env for {space_id} (PR {pr_num})") | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| # Create a temporary space for CI if didn't exist | |
| is_new = create_ephemeral_space(space_id=space_id, pr_num=pr_num) | |
| # Configure ephemeral Space if trusted author | |
| is_configured = False | |
| if is_new: | |
| is_configured = configure_ephemeral_space(space_id=space_id, pr_num=pr_num) | |
| # Download space codebase from PR revision | |
| snapshot_path = Path(snapshot_download(repo_id=space_id, revision=f"refs/pr/{pr_num}", repo_type="space")) | |
| # Overwrite README file in cache (/!\) | |
| readme_path = snapshot_path / "README.md" | |
| card = RepoCard.load(readme_path) | |
| setattr(card.data, "synced_sha", snapshot_path.name) # latest sha | |
| card.data.title = f"{card.data.title} (ephemeral #{pr_num})" | |
| card.save(readme_path) | |
| # Sync space codebase with PR revision | |
| upload_folder( | |
| repo_id=ci_space_id, | |
| repo_type="space", | |
| commit_message=f"Sync CI Space with PR {pr_num}.", | |
| folder_path=snapshot_path, | |
| delete_patterns="*", | |
| ) | |
| # Delete readme file from cache (just in case) | |
| readme_path.unlink(missing_ok=True) | |
| # Post a comment on the PR | |
| if is_new and is_configured: | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="created_and_configured") | |
| elif is_new: | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="created_not_configured") | |
| else: | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="updated") | |
| def create_ephemeral_space(space_id: str, pr_num: int) -> bool: | |
| # Config values | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| private: bool = EPHEMERAL_SPACES_CONFIG["private"] | |
| # Create space | |
| try: | |
| create_repo( | |
| ci_space_id, | |
| repo_type="space", | |
| space_sdk="docker", # Will be overwritten by sync | |
| private=private, | |
| exist_ok=False, | |
| ) | |
| return True | |
| except HTTPError as err: | |
| if err.response is not None and err.response.status_code == 409: # already exists | |
| return False | |
| else: | |
| raise | |
| def configure_ephemeral_space(space_id: str, pr_num: int) -> bool: | |
| # Config values | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| trusted_authors: List[str] = EPHEMERAL_SPACES_CONFIG["trusted_authors"] | |
| variables: Dict[str, str] = EPHEMERAL_SPACES_CONFIG["variables"] | |
| secrets: Dict[str, str] = EPHEMERAL_SPACES_CONFIG["secrets"] | |
| hardware: Optional[SpaceHardware] = EPHEMERAL_SPACES_CONFIG["hardware"] | |
| storage: Optional[SpaceHardware] = EPHEMERAL_SPACES_CONFIG["storage"] | |
| # Check if trusted author | |
| details = get_discussion_details(repo_id=space_id, repo_type="space", discussion_num=pr_num) | |
| if details.author not in trusted_authors: | |
| return False # not a trusted author => do NOT set secrets, hardware, storage, etc. | |
| # Configure space | |
| for key, value in variables.items(): | |
| add_space_variable(ci_space_id, key, value) | |
| for key, value in secrets.items(): | |
| add_space_secret(ci_space_id, key, value) | |
| # Request hardware/storage for space | |
| if hardware is not None and hardware != SpaceHardware.CPU_BASIC: | |
| request_space_hardware(ci_space_id, hardware, sleep_time=5 * 60) # sleep after 5min on PR Spaces with GPU | |
| if storage is not None: | |
| request_space_storage(ci_space_id, storage) | |
| return True | |
| def delete_ci_space(space_id: str, pr_num: int) -> None: | |
| print(f"New task: delete ephemeral env for {space_id} (PR {pr_num})") | |
| # Delete | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| try: | |
| delete_repo(repo_id=ci_space_id, repo_type="space") | |
| except RepositoryNotFoundError: | |
| # Repo did not exist: no need to notify | |
| return | |
| # Notify about deletion | |
| notify_pr(space_id=space_id, pr_num=pr_num, action="deleted") | |
| def notify_pr( | |
| space_id: str, | |
| pr_num: int, | |
| action: Literal["created_not_configured", "created_and_configured", "updated", "deleted"], | |
| ) -> None: | |
| ci_space_id = _get_ci_space_id(space_id=space_id, pr_num=pr_num) | |
| if action == "created_not_configured": | |
| comment = NOTIFICATION_TEMPLATE_CREATED_NOT_CONFIGURED.format(ci_space_id=ci_space_id) | |
| elif action == "created_and_configured": | |
| comment = NOTIFICATION_TEMPLATE_CREATED_AND_CONFIGURED.format(ci_space_id=ci_space_id) | |
| elif action == "updated": | |
| comment = NOTIFICATION_TEMPLATE_UPDATED.format(ci_space_id=ci_space_id) | |
| elif action == "deleted": | |
| comment = NOTIFICATION_TEMPLATE_DELETED | |
| else: | |
| raise ValueError(f"Status {action} not handled.") | |
| comment_discussion(repo_id=space_id, repo_type="space", discussion_num=pr_num, comment=comment) | |
| def _get_ci_space_id(space_id: str, pr_num: int) -> str: | |
| return f"{space_id}-ci-pr-{pr_num}" | |
| NOTIFICATION_TEMPLATE_CREATED_AND_CONFIGURED = """\ | |
| Following the creation of this PR, an ephemeral Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been started. Any changes pushed to this PR will be synced with the test Space. | |
| Since this PR has been created by a trusted author, the ephemeral Space has been configured with the correct hardware, storage, and secrets. | |
| _(This is an automated message.)_ | |
| """ | |
| NOTIFICATION_TEMPLATE_CREATED_NOT_CONFIGURED = """\ | |
| Following the creation of this PR, an ephemeral Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been started. Any changes pushed to this PR will be synced with the test Space. | |
| Since this PR has not been created by a trusted author, the ephemeral Space has not been configured with the correct hardware, storage, and secrets. An admin must configure it manually. | |
| _(This is an automated message.)_ | |
| """ | |
| NOTIFICATION_TEMPLATE_UPDATED = """\ | |
| Following new commits that happened in this PR, the ephemeral Space [{ci_space_id}](https://huggingface.co/spaces/{ci_space_id}) has been updated. | |
| _(This is an automated message.)_ | |
| """ | |
| NOTIFICATION_TEMPLATE_DELETED = """\ | |
| PR is now merged/closed. The ephemeral Space has been deleted. | |
| _(This is an automated message.)_ | |
| """ | |
| ### TO MOVE TO ITS OWN MODULE | |
| # Taken from https://github.com/huggingface/huggingface_hub/issues/1808#issuecomment-1802341663 | |
| headers = build_hf_headers() | |
| class WatchedItem(TypedDict): | |
| # Examples: | |
| # {"type": "user", "name": "julien-c"} | |
| # {"type": "org", "name": "HuggingFaceH4"} | |
| # {"type": "model", "name": "HuggingFaceH4/zephyr-7b-beta"} | |
| # {"type": "dataset", "name": "HuggingFaceH4/ultrachat_200k"} | |
| # {"type": "space", "name": "HuggingFaceH4/zephyr-chat"} | |
| type: Literal["model", "dataset", "space", "org", "user"] | |
| name: str | |
| # Do you want to subscribe to repo updates (code changes), discussion updates (issues, PRs, comments), or both? | |
| DOMAIN_T = Literal["repo", "discussion"] | |
| def get_webhook(webhook_id: str) -> Dict: | |
| """Get a webhook by its id.""" | |
| response = get_session().get(f"https://huggingface.co/api/settings/webhooks/{webhook_id}", headers=headers) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def list_webhooks() -> List[Dict]: | |
| """List all configured webhooks.""" | |
| response = get_session().get("https://huggingface.co/api/settings/webhooks", headers=headers) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def create_webhook(watched: List[WatchedItem], url: str, domains: List[DOMAIN_T], secret: Optional[str]) -> Dict: | |
| """Create a new webhook. | |
| Args: | |
| watched (List[WatchedItem]): | |
| List of items to watch. It an be users, orgs, models, datasets or spaces. | |
| See `WatchedItem` for more details. | |
| url (str): | |
| URL to send the payload to. | |
| domains (List[Literal["repo", "discussion"]]): | |
| List of domains to watch. It can be "repo", "discussion" or both. | |
| secret (str, optional): | |
| Secret to use to sign the payload. | |
| Returns: | |
| dict: The created webhook. | |
| Example: | |
| ```python | |
| >>> payload = create_webhook( | |
| ... watched=[{"type": "user", "name": "julien-c"}, {"type": "org", "name": "HuggingFaceH4"}], | |
| ... url="https://webhook.site/a2176e82-5720-43ee-9e06-f91cb4c91548", | |
| ... domains=["repo", "discussion"], | |
| ... secret="my-secret", | |
| ... ) | |
| { | |
| "webhook": { | |
| "id": "654bbbc16f2ec14d77f109cc", | |
| "watched": [{"type": "user", "name": "julien-c"}, {"type": "org", "name": "HuggingFaceH4"}], | |
| "url": "https://webhook.site/a2176e82-5720-43ee-9e06-f91cb4c91548", | |
| "secret": "my-secret", | |
| "domains": ["repo", "discussion"], | |
| "disabled": False, | |
| }, | |
| } | |
| ``` | |
| """ | |
| print("Creating webhook") | |
| print({"watched": watched, "url": url, "domains": domains, "secret": str(type(secret))}) | |
| response = get_session().post( | |
| "https://huggingface.co/api/settings/webhooks", | |
| json={"watched": watched, "url": url, "domains": domains, "secret": secret}, | |
| headers=headers, | |
| ) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def update_webhook( | |
| webhook_id: str, watched: List[WatchedItem], url: str, domains: List[DOMAIN_T], secret: Optional[str] | |
| ) -> Dict: | |
| """Update an existing webhook. | |
| Exact same usage as `create_webhook` but you must know the `webhook_id`. | |
| All fields are updated. | |
| """ | |
| response = get_session().post( | |
| f"https://huggingface.co/api/settings/webhooks/{webhook_id}", | |
| json={"watched": watched, "url": url, "domains": domains, "secret": secret}, | |
| headers=headers, | |
| ) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def enable_webhook(webhook_id: str) -> Dict: | |
| """Enable a webhook (makes it "active").""" | |
| response = get_session().post( | |
| f"https://huggingface.co/api/settings/webhooks/{webhook_id}/enable", | |
| headers=headers, | |
| ) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def disable_webhook(webhook_id: str) -> Dict: | |
| """Disable a webhook (makes it "disabled").""" | |
| response = get_session().post( | |
| f"https://huggingface.co/api/settings/webhooks/{webhook_id}/disable", | |
| headers=headers, | |
| ) | |
| hf_raise_for_status(response) | |
| return response.json() | |
| def delete_webhook(webhook_id: str): | |
| """Delete a webhook.""" | |
| response = get_session().delete( | |
| f"https://huggingface.co/api/settings/webhooks/{webhook_id}", | |
| headers=headers, | |
| ) | |
| hf_raise_for_status(response) | |