Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
from fastapi import FastAPI, Request, Response | |
import os | |
import requests | |
import json | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Configurations | |
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL") | |
MODEL_CATALOG_WEBHOOK = os.environ.get("MODEL_CATALOG_WEBHOOK") | |
MODEL_CATALOG_WEBHOOK_SECRET = os.environ.get("MODEL_CATALOG_WEBHOOK_SECRET") | |
SIMSHIP_WEBHOOK = os.environ.get("SIMSHIP_WEBHOOK") | |
SIMSHIP_WEBHOOK_SECRET = os.environ.get("SIMSHIP_WEBHOOK_SECRET") | |
def send_slack_message(message: str): | |
"""Send a message to Slack using webhook URL""" | |
if not SLACK_WEBHOOK_URL: | |
logger.warning(f"No Slack webhook URL configured. Message: {message}") | |
return | |
payload = {"text": message} | |
try: | |
response = requests.post(SLACK_WEBHOOK_URL, json=payload) | |
response.raise_for_status() | |
logger.info(f"Slack message sent successfully: {message}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to send Slack message: {e}") | |
def process_model_catalog_webhook(data: dict): | |
""" | |
Process model catalog webhook | |
""" | |
event = data.get("event", {}) | |
repo = data.get("repo", {}) | |
movedTo = data.get("movedTo", {}) | |
# repo name changed | |
if ( | |
repo.get("type") == "model" and | |
event.get("scope") == "repo" and | |
event.get("action") == "move" | |
): | |
message = ( | |
"π Model Catalog Sync Alert π\n\n" | |
f"Model in the catalog renamed π \n" | |
f"{repo.get('name', '')} β https://hf.co/{movedTo.get('name', '')}" | |
) | |
send_slack_message(message) | |
# repo deleted | |
elif ( | |
repo.get("type") == "model" and | |
event.get("scope") == "repo" and | |
event.get("action") == "delete" | |
): | |
message = ( | |
"π Model Catalog Sync Alert π\n\n" | |
f"Model in the catalog deleted ποΈ\n" | |
f"https://hf.co/{repo.get('name', '')}" | |
) | |
send_slack_message(message) | |
# other events | |
else: | |
pass | |
def process_simship_webhook(data: dict): | |
""" | |
Process simship webhook | |
""" | |
event = data.get("event", {}) | |
repo = data.get("repo", {}) | |
updatedConfig = data.get("updatedConfig", {}) | |
# repo creation | |
if ( | |
repo.get("type") == "model" and | |
event.get("scope") == "repo" and | |
event.get("action") == "create" | |
): | |
message = ( | |
"π₯οΈ SimShip Provider Alert π₯οΈ\n\n" | |
f"SimShip Model created π\n" | |
f"https://hf.co/{repo.get('name', '')}" | |
) | |
send_slack_message(message) | |
# repo visibility update | |
elif ( | |
repo.get("type") == "model" and | |
event.get("scope") == "repo.config" and | |
event.get("action") == "update" and | |
updatedConfig.get("private") is False | |
): | |
message = ( | |
"π₯οΈ SimShip Provider Alert π₯οΈ\n\n" | |
f"SimShip Model made public π\n" | |
f"https://hf.co/{repo.get('name', '')}" | |
) | |
send_slack_message(message) | |
# other events | |
else: | |
pass | |
app = FastAPI() | |
async def model_catalog_webhook(request: Request): | |
logger.info(f"Received model catalog webhook request from {request.client.host}") | |
if request.headers.get("X-Webhook-Secret") != MODEL_CATALOG_WEBHOOK_SECRET: | |
logger.warning("Invalid webhook secret received for model catalog") | |
return Response("Invalid secret", status_code=401) | |
data = await request.json() | |
logger.info(f"Model catalog webhook payload: {json.dumps(data, indent=2)}") | |
webhook_id = data.get("webhook", {}).get("id", None) | |
if webhook_id == MODEL_CATALOG_WEBHOOK: | |
logger.info("Processing model catalog webhook") | |
process_model_catalog_webhook(data) | |
else: | |
logger.warning("Invalid webhook ID received for model catalog") | |
return Response("Invalid webhook ID", status_code=401) | |
return Response("Model catalog webhook notification received and processed!", status_code=200) | |
async def simship_webhook(request: Request): | |
logger.info(f"Received simship webhook request from {request.client.host}") | |
if request.headers.get("X-Webhook-Secret") != SIMSHIP_WEBHOOK_SECRET: | |
logger.warning("Invalid webhook secret received for simship") | |
return Response("Invalid secret", status_code=401) | |
data = await request.json() | |
logger.info(f"Simship webhook payload: {json.dumps(data, indent=2)}") | |
webhook_id = data.get("webhook", {}).get("id", None) | |
if webhook_id == SIMSHIP_WEBHOOK: | |
logger.info("Processing simship webhook") | |
process_simship_webhook(data) | |
else: | |
logger.warning("Invalid webhook ID received for simship") | |
return Response("Invalid webhook ID", status_code=401) | |
return Response("Simship webhook notification received and processed!", status_code=200) | |