Spaces:
Running
Running
| """ | |
| Streaming JSON utilities. | |
| These helpers keep the HTTP connection active by sending lightweight | |
| heartbeat bytes while the backend is waiting for model output, then emit | |
| the final JSON payload as the last chunk. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| from collections.abc import AsyncGenerator, Awaitable, Mapping | |
| from typing import Any | |
| from fastapi.responses import StreamingResponse | |
| def _heartbeat_interval_seconds() -> float: | |
| raw = os.getenv("JSON_STREAM_HEARTBEAT_MS", "1000") | |
| try: | |
| millis = int(raw) | |
| except (TypeError, ValueError): | |
| millis = 1000 | |
| if millis < 0: | |
| millis = 1000 | |
| return millis / 1000 | |
| def create_streaming_json_response( | |
| result_awaitable: Awaitable[Mapping[str, Any] | dict[str, Any]], | |
| ) -> StreamingResponse: | |
| """ | |
| Return a streaming JSON response that emits periodic heartbeats. | |
| """ | |
| heartbeat_sec = _heartbeat_interval_seconds() | |
| async def _stream() -> AsyncGenerator[bytes, None]: | |
| task = asyncio.create_task(result_awaitable) | |
| # Send first byte immediately so clients receive headers/body quickly. | |
| yield b"\n" | |
| while not task.done(): | |
| try: | |
| await asyncio.wait_for(asyncio.shield(task), timeout=heartbeat_sec) | |
| except TimeoutError: | |
| yield b"\n" | |
| payload = task.result() | |
| yield json.dumps(payload, ensure_ascii=False).encode("utf-8") | |
| headers = { | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| } | |
| return StreamingResponse(_stream(), media_type="application/json", headers=headers) | |