Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import stripe | |
| import uvicorn | |
| from fastapi import FastAPI, Request, Header | |
| app = FastAPI() | |
| stripe.api_key = os.environ["STRIPE_KEY"] | |
| # This is a terrible idea, only used for demo purposes! | |
| app.state.stripe_customer_id = None | |
| def start(): | |
| return {"Status":"Deployed"} | |
| async def webhook_received(request: Request, stripe_signature: str = Header(None)): | |
| webhook_secret = os.environ["STRIPE_WEBHOOK_SECRET"] | |
| print("Entré al webhook 182...") | |
| time.sleep(1) | |
| data = await request.body() | |
| try: | |
| event = stripe.Webhook.construct_event( | |
| payload=data, | |
| sig_header=stripe_signature, | |
| secret=webhook_secret | |
| ) | |
| event_data = event['data'] | |
| except Exception as e: | |
| return {"error": str(e)} | |
| event_type = event['type'] | |
| print("Voy a imprimir el event type:") | |
| print(event_type) | |
| time.sleep(1) | |
| if event_type == 'checkout.session.completed': | |
| print('checkout session completed') | |
| elif event_type == 'invoice.paid': | |
| print('invoice paid') | |
| elif event_type == 'invoice.payment_failed': | |
| print('invoice payment failed') | |
| else: | |
| print(f'unhandled event: {event_type}') | |
| return {"status": "success"} | |
| if __name__ == '__main__': | |
| uvicorn.run("main:app", reload=True) |