pagezyhf's picture
pagezyhf HF Staff
t
161b1fd
from fastapi import FastAPI, Request, Response
import os
import requests
import json
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configurations
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL")
MODEL_CATALOG_WEBHOOK = os.environ.get("MODEL_CATALOG_WEBHOOK")
MODEL_CATALOG_WEBHOOK_SECRET = os.environ.get("MODEL_CATALOG_WEBHOOK_SECRET")
SIMSHIP_WEBHOOK = os.environ.get("SIMSHIP_WEBHOOK")
SIMSHIP_WEBHOOK_SECRET = os.environ.get("SIMSHIP_WEBHOOK_SECRET")
def send_slack_message(message: str):
"""Send a message to Slack using webhook URL"""
if not SLACK_WEBHOOK_URL:
logger.warning(f"No Slack webhook URL configured. Message: {message}")
return
payload = {"text": message}
try:
response = requests.post(SLACK_WEBHOOK_URL, json=payload)
response.raise_for_status()
logger.info(f"Slack message sent successfully: {message}")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to send Slack message: {e}")
def process_model_catalog_webhook(data: dict):
"""
Process model catalog webhook
"""
event = data.get("event", {})
repo = data.get("repo", {})
movedTo = data.get("movedTo", {})
# repo name changed
if (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "move"
):
message = (
"πŸ“š Model Catalog Sync Alert πŸ“š\n\n"
f"Model in the catalog renamed πŸ”„ \n"
f"{repo.get('name', '')} β†’ https://hf.co/{movedTo.get('name', '')}"
)
send_slack_message(message)
# repo deleted
elif (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "delete"
):
message = (
"πŸ“š Model Catalog Sync Alert πŸ“š\n\n"
f"Model in the catalog deleted πŸ—‘οΈ\n"
f"https://hf.co/{repo.get('name', '')}"
)
send_slack_message(message)
# other events
else:
pass
def process_simship_webhook(data: dict):
"""
Process simship webhook
"""
event = data.get("event", {})
repo = data.get("repo", {})
updatedConfig = data.get("updatedConfig", {})
# repo creation
if (
repo.get("type") == "model" and
event.get("scope") == "repo" and
event.get("action") == "create"
):
message = (
"πŸ›₯️ SimShip Provider Alert πŸ›₯️\n\n"
f"SimShip Model created πŸ†•\n"
f"https://hf.co/{repo.get('name', '')}"
)
send_slack_message(message)
# repo visibility update
elif (
repo.get("type") == "model" and
event.get("scope") == "repo.config" and
event.get("action") == "update" and
updatedConfig.get("private") is False
):
message = (
"πŸ›₯️ SimShip Provider Alert πŸ›₯️\n\n"
f"SimShip Model made public πŸ†•\n"
f"https://hf.co/{repo.get('name', '')}"
)
send_slack_message(message)
# other events
else:
pass
app = FastAPI()
@app.post("/webhook/model-catalog")
async def model_catalog_webhook(request: Request):
logger.info(f"Received model catalog webhook request from {request.client.host}")
if request.headers.get("X-Webhook-Secret") != MODEL_CATALOG_WEBHOOK_SECRET:
logger.warning("Invalid webhook secret received for model catalog")
return Response("Invalid secret", status_code=401)
data = await request.json()
logger.info(f"Model catalog webhook payload: {json.dumps(data, indent=2)}")
webhook_id = data.get("webhook", {}).get("id", None)
if webhook_id == MODEL_CATALOG_WEBHOOK:
logger.info("Processing model catalog webhook")
process_model_catalog_webhook(data)
else:
logger.warning("Invalid webhook ID received for model catalog")
return Response("Invalid webhook ID", status_code=401)
return Response("Model catalog webhook notification received and processed!", status_code=200)
@app.post("/webhook/simship")
async def simship_webhook(request: Request):
logger.info(f"Received simship webhook request from {request.client.host}")
if request.headers.get("X-Webhook-Secret") != SIMSHIP_WEBHOOK_SECRET:
logger.warning("Invalid webhook secret received for simship")
return Response("Invalid secret", status_code=401)
data = await request.json()
logger.info(f"Simship webhook payload: {json.dumps(data, indent=2)}")
webhook_id = data.get("webhook", {}).get("id", None)
if webhook_id == SIMSHIP_WEBHOOK:
logger.info("Processing simship webhook")
process_simship_webhook(data)
else:
logger.warning("Invalid webhook ID received for simship")
return Response("Invalid webhook ID", status_code=401)
return Response("Simship webhook notification received and processed!", status_code=200)