Spaces:
Sleeping
Sleeping
import hmac | |
from hashlib import sha256 | |
from typing import Dict, Any | |
from fastapi import FastAPI, Header, HTTPException, Request, Response | |
from huggingface_hub import HfApi | |
import os | |
# --- Configuration --- | |
# Load from Space secrets | |
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET") | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
RESOURCE_GROUP_ID = os.environ.get("RESOURCE_GROUP_ID") | |
if not all([WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID]): | |
raise ValueError("Missing one or more required environment variables: WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID") | |
app = FastAPI() | |
hf_api = HfApi(token=os.getenv("HF_TOKEN")) | |
# --- Webhook validation --- | |
async def validate_webhook(request: Request, x_webhook_secret: str = Header(...)): | |
if not x_webhook_secret: | |
raise HTTPException(status_code=400, detail="X-Webhook-Secret header is missing") | |
# Make sure the secret matches | |
if x_webhook_secret != WEBHOOK_SECRET: | |
raise HTTPException(status_code=401, detail="Invalid webhook secret") | |
# --- Webhook endpoint --- | |
def webhook_endpoint(): | |
# path_to_call = f"/api/models/{MODEL_REPO_ID}/resource-group" | |
payload = {"resourceGroupId": RESOURCE_GROUP_ID} | |
response = hf_api.get_user_overview(username="CharlieBoyer", token=HF_TOKEN) | |
print(response) | |
return {"status": "event_not_handled"} | |
def health_check(): | |
return "Webhook listener is running." |