Qurio / backend-python /src /utils /json_stream.py
veeiiinnnnn's picture
Add backend-python and Dockerfile
4ef118d
"""
Streaming JSON utilities.
These helpers keep the HTTP connection active by sending lightweight
heartbeat bytes while the backend is waiting for model output, then emit
the final JSON payload as the last chunk.
"""
from __future__ import annotations
import asyncio
import json
import os
from collections.abc import AsyncGenerator, Awaitable, Mapping
from typing import Any
from fastapi.responses import StreamingResponse
def _heartbeat_interval_seconds() -> float:
raw = os.getenv("JSON_STREAM_HEARTBEAT_MS", "1000")
try:
millis = int(raw)
except (TypeError, ValueError):
millis = 1000
if millis < 0:
millis = 1000
return millis / 1000
def create_streaming_json_response(
result_awaitable: Awaitable[Mapping[str, Any] | dict[str, Any]],
) -> StreamingResponse:
"""
Return a streaming JSON response that emits periodic heartbeats.
"""
heartbeat_sec = _heartbeat_interval_seconds()
async def _stream() -> AsyncGenerator[bytes, None]:
task = asyncio.create_task(result_awaitable)
# Send first byte immediately so clients receive headers/body quickly.
yield b"\n"
while not task.done():
try:
await asyncio.wait_for(asyncio.shield(task), timeout=heartbeat_sec)
except TimeoutError:
yield b"\n"
payload = task.result()
yield json.dumps(payload, ensure_ascii=False).encode("utf-8")
headers = {
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
return StreamingResponse(_stream(), media_type="application/json", headers=headers)