webhook_space / main.py
Saketh-Reddy's picture
Update main.py
a50294e
import os
import requests
from typing import Optional
from huggingface_hub import snapshot_download
from fastapi import FastAPI, Header, HTTPException, Request, Response
from huggingface_hub.hf_api import HfApi
from huggingface_hub import whoami
from huggingface_hub.utils import build_hf_headers, hf_raise_for_status
from huggingface_hub import delete_repo
app = FastAPI()
api = HfApi()
token = "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"
@app.get("/")
def read_root():
return {"Hello": "World!"}
@app.post("/webhook")
async def webhook(request: Request):
if request.method == "POST":
if request.headers.get("X-Webhook-Secret") != "webhooksecret":
return Response("Invalid secret", status_code=401)
data = await request.json()
if(data["event"]["action"]=="update" and data["event"]["scope"]=="repo.content" and data["repo"]["type"]=="model"):
try:
_ = whoami(token)
# ^ this will throw if token is invalid
delete_repo(repo_id="shellplc/ThirdParty", token = token, repo_type="model",missing_ok=True)
print("deleted")
r = requests.post(
f"https://huggingface.co/api/models/SakethTest/ThirdParty/duplicate",
headers=build_hf_headers(token=token),
json={"repository": "shellplc/ThirdParty"},
)
hf_raise_for_status(r)
repo_url = r.json().get("url")
print(repo_url)
return {"processed": True}
except Exception as e:
pass
print("its exception")
print(e)
return (
f"""
### Error 😒😒😒
{e}
""",
None,
)
else:
print("cond didn't match")
return {"processed": False}