rg-demo / app.py
CharlieBoyer's picture
CharlieBoyer HF Staff
add app file
0d8c9ef
import hmac
from hashlib import sha256
from typing import Dict, Any
from fastapi import FastAPI, Header, HTTPException, Request, Response
from huggingface_hub import HfApi
import os
# --- Configuration ---
# Load from Space secrets
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET")
HF_TOKEN = os.environ.get("HF_TOKEN")
RESOURCE_GROUP_ID = os.environ.get("RESOURCE_GROUP_ID")
if not all([WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID]):
raise ValueError("Missing one or more required environment variables: WEBHOOK_SECRET, HF_TOKEN, RESOURCE_GROUP_ID")
app = FastAPI()
hf_api = HfApi(token=os.getenv("HF_TOKEN"))
# --- Webhook validation ---
async def validate_webhook(request: Request, x_webhook_secret: str = Header(...)):
if not x_webhook_secret:
raise HTTPException(status_code=400, detail="X-Webhook-Secret header is missing")
# Make sure the secret matches
if x_webhook_secret != WEBHOOK_SECRET:
raise HTTPException(status_code=401, detail="Invalid webhook secret")
# --- Webhook endpoint ---
@app.get("/webhook")
def webhook_endpoint():
# path_to_call = f"/api/models/{MODEL_REPO_ID}/resource-group"
payload = {"resourceGroupId": RESOURCE_GROUP_ID}
response = hf_api.get_user_overview(username="CharlieBoyer", token=HF_TOKEN)
print(response)
return {"status": "event_not_handled"}
@app.get("/")
def health_check():
return "Webhook listener is running."