WindsurfAPI / src /connect.js
github-actions[bot]
Deploy from GitHub: 7495fde758f0be655f95e6331fec2898267f790c
f6266b9
/**
* Connect-RPC envelope framing and compression.
*
* Connect-RPC frame format:
* [1 byte flags] [4 bytes big-endian length] [N bytes payload]
*
* Flags:
* 0x01 = gzip compressed
* 0x02 = end-of-stream (trailer frame, JSON payload)
* 0x03 = compressed + end-of-stream
*
* IMPORTANT: Connect-RPC uses HTTP/1.1 POST, NOT HTTP/2 gRPC.
* Content-Type: application/connect+proto
*/
import { gzipSync, gunzipSync } from 'zlib';
// ─── Compression helpers ───────────────────────────────────
export function gzip(buf) { return gzipSync(buf); }
export function gunzip(buf) { return gunzipSync(buf); }
export function tryGunzip(buf) {
try { return gunzipSync(buf); }
catch { return null; }
}
// ─── Envelope wrapping ─────────────────────────────────────
/**
* Wrap protobuf bytes in a Connect-RPC envelope frame.
*/
export function wrapEnvelope(protoBuf, { compress = true } = {}) {
let payload = protoBuf;
let flags = 0;
if (compress && payload.length > 0) {
payload = gzipSync(payload);
flags |= 0x01;
}
const frame = Buffer.alloc(5 + payload.length);
frame[0] = flags;
frame.writeUInt32BE(payload.length, 1);
payload.copy(frame, 5);
return frame;
}
/**
* Wrap a request for sending (single envelope, gzipped).
*/
export function wrapRequest(protoBuf) {
return wrapEnvelope(protoBuf, { compress: true });
}
/**
* Build the end-of-stream trailer frame (JSON {}).
*/
export function endOfStreamEnvelope() {
const trailer = Buffer.from('{}');
const frame = Buffer.alloc(5 + trailer.length);
frame[0] = 0x02; // end-of-stream, not compressed
frame.writeUInt32BE(trailer.length, 1);
trailer.copy(frame, 5);
return frame;
}
// ─── Request unwrapping ────────────────────────────────────
/**
* Unwrap a Connect-RPC request body β†’ raw protobuf bytes.
* Handles both envelope-wrapped and HTTP-level gzip.
*/
export function unwrapRequest(body, headers = {}) {
let buf = Buffer.isBuffer(body) ? body : Buffer.from(body);
// HTTP-level content-encoding gzip
const encoding = headers['content-encoding'] || headers['connect-content-encoding'] || '';
if (encoding === 'gzip') {
buf = gunzipSync(buf);
}
// Check if it's envelope-wrapped (flags byte + 4-byte length)
if (buf.length >= 5) {
const flags = buf[0];
const len = buf.readUInt32BE(1);
if (len === buf.length - 5 && (flags === 0 || flags === 1)) {
let payload = buf.subarray(5);
if (flags & 0x01) payload = gunzipSync(payload);
return payload;
}
}
return buf;
}
// ─── Streaming frame parser ───────────────────────────────
/**
* Stateful parser that buffers incoming data and yields complete frames.
*/
export class StreamingFrameParser {
constructor() {
this.buffer = Buffer.alloc(0);
}
push(chunk) {
this.buffer = Buffer.concat([this.buffer, chunk]);
}
/** Drain all complete frames. Returns [{ flags, isEndStream, payload }]. */
drain() {
const frames = [];
while (this.buffer.length >= 5) {
const len = this.buffer.readUInt32BE(1);
if (this.buffer.length < 5 + len) break;
const flags = this.buffer[0];
let payload = this.buffer.subarray(5, 5 + len);
if (flags & 0x01) {
try { payload = gunzipSync(payload); }
catch { this.buffer = this.buffer.subarray(5 + len); continue; }
}
frames.push({
flags,
isEndStream: !!(flags & 0x02),
payload,
});
this.buffer = this.buffer.subarray(5 + len);
}
return frames;
}
}
// ─── Connect-RPC headers ──────────────────────────────────
export function connectHeaders(extra = {}) {
return {
'Content-Type': 'application/connect+proto',
'Connect-Protocol-Version': '1',
'Connect-Accept-Encoding': 'gzip',
'User-Agent': 'connect-es/2.0.0',
...extra,
};
}