Spaces:
Sleeping
Sleeping
import os | |
import requests | |
from typing import Optional | |
from huggingface_hub import snapshot_download | |
from fastapi import FastAPI, Header, HTTPException, Request, Response | |
from huggingface_hub.hf_api import HfApi | |
from huggingface_hub import whoami | |
from huggingface_hub.utils import build_hf_headers, hf_raise_for_status | |
from huggingface_hub import delete_repo | |
app = FastAPI() | |
api = HfApi() | |
token = "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" | |
def read_root(): | |
return {"Hello": "World!"} | |
async def webhook(request: Request): | |
if request.method == "POST": | |
if request.headers.get("X-Webhook-Secret") != "webhooksecret": | |
return Response("Invalid secret", status_code=401) | |
data = await request.json() | |
if(data["event"]["action"]=="update" and data["event"]["scope"]=="repo.content" and data["repo"]["type"]=="model"): | |
try: | |
_ = whoami(token) | |
# ^ this will throw if token is invalid | |
delete_repo(repo_id="shellplc/ThirdParty", token = token, repo_type="model",missing_ok=True) | |
print("deleted") | |
r = requests.post( | |
f"https://huggingface.co/api/models/SakethTest/ThirdParty/duplicate", | |
headers=build_hf_headers(token=token), | |
json={"repository": "shellplc/ThirdParty"}, | |
) | |
hf_raise_for_status(r) | |
repo_url = r.json().get("url") | |
print(repo_url) | |
return {"processed": True} | |
except Exception as e: | |
pass | |
print("its exception") | |
print(e) | |
return ( | |
f""" | |
### Error π’π’π’ | |
{e} | |
""", | |
None, | |
) | |
else: | |
print("cond didn't match") | |
return {"processed": False} | |