Spaces:
Sleeping
Sleeping
| """ | |
| Tiny FastAPI proxy for the Polymarket Control Deck. | |
| - Serves index.html at / | |
| - Proxies /api/events -> https://gamma-api.polymarket.com/events | |
| - Proxies /api/prices-history-> https://clob.polymarket.com/prices-history | |
| The Gamma API whitelists CORS to https://polymarket.com only, so a browser | |
| running on huggingface.co/spaces/... cannot call it directly. This proxy | |
| sidesteps that by running on the same origin as the static frontend. | |
| """ | |
| from __future__ import annotations | |
| import httpx | |
| from fastapi import FastAPI, Request, Response | |
| from fastapi.responses import FileResponse | |
| GAMMA = "https://gamma-api.polymarket.com" | |
| CLOB = "https://clob.polymarket.com" | |
| app = FastAPI(title="Polymarket Control Deck") | |
| client = httpx.AsyncClient( | |
| timeout=httpx.Timeout(20.0), | |
| headers={ | |
| # Gamma responds to this origin with the right CORS headers; it also | |
| # happens to serve the canonical response, so we impersonate it here. | |
| "Origin": "https://polymarket.com", | |
| "Referer": "https://polymarket.com/", | |
| "User-Agent": ( | |
| "Mozilla/5.0 polymarket-control-deck/1.0 (FastAPI proxy)" | |
| ), | |
| }, | |
| ) | |
| async def _proxy(upstream_url: str, request: Request) -> Response: | |
| params = dict(request.query_params) | |
| try: | |
| r = await client.get(upstream_url, params=params) | |
| except httpx.HTTPError as e: | |
| return Response( | |
| content=f'{{"error":"upstream_error","detail":"{type(e).__name__}"}}', | |
| status_code=502, | |
| media_type="application/json", | |
| ) | |
| # Pass through JSON. Strip hop-by-hop headers. | |
| return Response( | |
| content=r.content, | |
| status_code=r.status_code, | |
| media_type=r.headers.get("content-type", "application/json"), | |
| ) | |
| async def events(request: Request) -> Response: | |
| return await _proxy(f"{GAMMA}/events", request) | |
| async def prices_history(request: Request) -> Response: | |
| return await _proxy(f"{CLOB}/prices-history", request) | |
| async def health() -> dict: | |
| return {"status": "nominal"} | |
| async def root() -> FileResponse: | |
| return FileResponse("index.html") | |