Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Request, Header | |
from pymongo import MongoClient | |
from datetime import datetime | |
import os | |
app = FastAPI() | |
# Get MongoDB URI from environment variable | |
MONGODB_URI = os.getenv('MONGODB_URI', 'mongodb://localhost:27017/') | |
client = MongoClient(MONGODB_URI) | |
db = client["github_webhooks"] | |
collection = db["events"] | |
async def webhook(request: Request, x_github_event: str = Header(None)): | |
payload = await request.json() | |
timestamp = datetime.utcnow().strftime("%-d %B %Y - %-I:%M %p UTC") | |
if x_github_event == "push": | |
author = payload["pusher"]["name"] | |
to_branch = payload["ref"].split("/")[-1] | |
msg = f'{author} pushed to {to_branch} on {timestamp}' | |
elif x_github_event == "pull_request": | |
action = payload["action"] | |
pr = payload["pull_request"] | |
author = pr["user"]["login"] | |
from_branch = pr["head"]["ref"] | |
to_branch = pr["base"]["ref"] | |
if action == "opened": | |
msg = f'{author} submitted a pull request from {from_branch} to {to_branch} on {timestamp}' | |
elif action == "closed" and pr.get("merged"): | |
msg = f'{author} merged branch {from_branch} to {to_branch} on {timestamp}' | |
else: | |
return {"message": "Ignored non-merge PR close"} | |
else: | |
return {"message": "Event ignored"} | |
collection.insert_one({"message": msg, "timestamp": datetime.utcnow()}) | |
return {"message": "Event stored"} | |
async def root(): | |
return {"message": "GitHub Webhook API is running!", "status": "active"} | |
async def health_check(): | |
try: | |
# Test database connection | |
client.admin.command('ping') | |
return {"status": "healthy", "database": "connected"} | |
except Exception as e: | |
return {"status": "unhealthy", "error": str(e)} |