Lee Thanh
Upload 3012 files
5641073
raw
history blame
15.3 kB
/**
* @author jdiaz5513
*/
import initTrace from "debug";
import { DEFAULT_TRAVERSE_LIMIT, DEFAULT_BUFFER_SIZE } from "../constants";
import {
MSG_INVALID_FRAME_HEADER,
MSG_SEGMENT_OUT_OF_BOUNDS,
MSG_SEGMENT_TOO_SMALL,
MSG_NO_SEGMENTS_IN_ARENA,
} from "../errors";
import { dumpBuffer, format, padToWord } from "../util";
import { AnyArena, Arena, MultiSegmentArena, SingleSegmentArena, ArenaKind } from "./arena";
import { pack, unpack } from "./packing";
import { Pointer, StructCtor, PointerType, Struct } from "./pointers";
import { Segment } from "./segment";
import { getTargetStructSize, validate } from "./pointers/pointer";
import { resize, initStruct } from "./pointers/struct";
const trace = initTrace("capnp:message");
trace("load");
export interface _Message {
readonly arena: AnyArena;
segments: Segment[];
traversalLimit: number;
}
export class Message {
static readonly allocateSegment = allocateSegment;
static readonly dump = dump;
static readonly getRoot = getRoot;
static readonly getSegment = getSegment;
static readonly initRoot = initRoot;
static readonly readRawPointer = readRawPointer;
static readonly toArrayBuffer = toArrayBuffer;
static readonly toPackedArrayBuffer = toPackedArrayBuffer;
readonly _capnp: _Message;
/**
* A Cap'n Proto message.
*
* SECURITY WARNING: In nodejs do not pass a Buffer's internal array buffer into this constructor. Pass the buffer
* directly and everything will be fine. If not, your message will potentially be initialized with random memory
* contents!
*
* The constructor method creates a new Message, optionally using a provided arena for segment allocation, or a buffer
* to read from.
*
* @constructor {Message}
*
* @param {AnyArena|ArrayBufferView|ArrayBuffer} [src] The source for the message.
* A value of `undefined` will cause the message to initialize with a single segment arena only big enough for the
* root pointer; it will expand as you go. This is a reasonable choice for most messages.
*
* Passing an arena will cause the message to use that arena for its segment allocation. Contents will be accepted
* as-is.
*
* Passing an array buffer view (like `DataView`, `Uint8Array` or `Buffer`) will create a **copy** of the source
* buffer; beware of the potential performance cost!
*
* @param {boolean} [packed] Whether or not the message is packed. If `true` (the default), the message will be
* unpacked.
*
* @param {boolean} [singleSegment] If true, `src` will be treated as a message consisting of a single segment without
* a framing header.
*
*/
constructor(src?: AnyArena | ArrayBufferView | ArrayBuffer, packed = true, singleSegment = false) {
this._capnp = initMessage(src, packed, singleSegment);
if (src && !isAnyArena(src)) preallocateSegments(this);
trace("new %s", this);
}
allocateSegment(byteLength: number): Segment {
return allocateSegment(byteLength, this);
}
/**
* Create a pretty-printed string dump of this message; incredibly useful for debugging.
*
* WARNING: Do not call this method on large messages!
*
* @returns {string} A big steaming pile of pretty hex digits.
*/
dump(): string {
return dump(this);
}
/**
* Get a struct pointer for the root of this message. This is primarily used when reading a message; it will not
* overwrite existing data.
*
* @template T
* @param {StructCtor<T>} RootStruct The struct type to use as the root.
* @returns {T} A struct representing the root of the message.
*/
getRoot<T extends Struct>(RootStruct: StructCtor<T>): T {
return getRoot(RootStruct, this);
}
/**
* Get a segment by its id.
*
* This will lazily allocate the first segment if it doesn't already exist.
*
* @param {number} id The segment id.
* @returns {Segment} The requested segment.
*/
getSegment(id: number): Segment {
return getSegment(id, this);
}
/**
* Initialize a new message using the provided struct type as the root.
*
* @template T
* @param {StructCtor<T>} RootStruct The struct type to use as the root.
* @returns {T} An initialized struct pointing to the root of the message.
*/
initRoot<T extends Struct>(RootStruct: StructCtor<T>): T {
return initRoot(RootStruct, this);
}
/**
* Set the root of the message to a copy of the given pointer. Used internally
* to make copies of pointers for default values.
*
* @param {Pointer} src The source pointer to copy.
* @returns {void}
*/
setRoot(src: Pointer): void {
setRoot(src, this);
}
/**
* Combine the contents of this message's segments into a single array buffer and prepend a stream framing header
* containing information about the following segment data.
*
* @returns {ArrayBuffer} An ArrayBuffer with the contents of this message.
*/
toArrayBuffer(): ArrayBuffer {
return toArrayBuffer(this);
}
/**
* Like `toArrayBuffer()`, but also applies the packing algorithm to the output. This is typically what you want to
* use if you're sending the message over a network link or other slow I/O interface where size matters.
*
* @returns {ArrayBuffer} A packed message.
*/
toPackedArrayBuffer(): ArrayBuffer {
return toPackedArrayBuffer(this);
}
toString(): string {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
return `Message_arena:${this._capnp.arena}`;
}
}
export interface CreateMessageOptions {
packed?: boolean;
singleSegment?: boolean;
}
export function initMessage(
src?: AnyArena | ArrayBufferView | ArrayBuffer,
packed = true,
singleSegment = false
): _Message {
if (src === undefined) {
return {
arena: new SingleSegmentArena(),
segments: [],
traversalLimit: DEFAULT_TRAVERSE_LIMIT,
};
}
if (isAnyArena(src)) {
return { arena: src, segments: [], traversalLimit: DEFAULT_TRAVERSE_LIMIT };
}
let buf: ArrayBuffer = src as ArrayBuffer;
if (isArrayBufferView(buf)) {
buf = buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
}
if (packed) buf = unpack(buf);
if (singleSegment) {
return {
arena: new SingleSegmentArena(buf),
segments: [],
traversalLimit: DEFAULT_TRAVERSE_LIMIT,
};
}
return {
arena: new MultiSegmentArena(getFramedSegments(buf)),
segments: [],
traversalLimit: DEFAULT_TRAVERSE_LIMIT,
};
}
/**
* Given an _unpacked_ message with a segment framing header, this will generate an ArrayBuffer for each segment in
* the message.
*
* This method is not typically called directly, but can be useful in certain cases.
*
* @static
* @param {ArrayBuffer} message An unpacked message with a framing header.
* @returns {ArrayBuffer[]} An array of buffers containing the segment data.
*/
export function getFramedSegments(message: ArrayBuffer): ArrayBuffer[] {
const dv = new DataView(message);
const segmentCount = dv.getUint32(0, true) + 1;
const segments = new Array(segmentCount) as ArrayBuffer[];
trace("reading %d framed segments from stream", segmentCount);
let byteOffset = 4 + segmentCount * 4;
byteOffset += byteOffset % 8;
if (byteOffset + segmentCount * 4 > message.byteLength) {
throw new Error(MSG_INVALID_FRAME_HEADER);
}
for (let i = 0; i < segmentCount; i++) {
const byteLength = dv.getUint32(4 + i * 4, true) * 8;
if (byteOffset + byteLength > message.byteLength) {
throw new Error(MSG_INVALID_FRAME_HEADER);
}
segments[i] = message.slice(byteOffset, byteOffset + byteLength);
byteOffset += byteLength;
}
return segments;
}
/**
* This method is called on messages that were constructed with existing data to prepopulate the segments array with
* everything we can find in the arena. Each segment will have it's `byteLength` set to the size of its buffer.
*
* Technically speaking, the message's segments will be "full" after calling this function. Calling this on your own
* may void your warranty.
*
* @param {Message} m The message to allocate.
* @returns {void}
*/
export function preallocateSegments(m: Message): void {
const numSegments = Arena.getNumSegments(m._capnp.arena);
if (numSegments < 1) throw new Error(MSG_NO_SEGMENTS_IN_ARENA);
m._capnp.segments = new Array(numSegments) as Segment[];
for (let i = 0; i < numSegments; i++) {
// Set up each segment so that they're fully allocated to the extents of the existing buffers.
const buffer = Arena.getBuffer(i, m._capnp.arena);
const segment = new Segment(i, m, buffer, buffer.byteLength);
m._capnp.segments[i] = segment;
}
}
function isArrayBufferView(src: ArrayBuffer | ArrayBufferView): src is ArrayBufferView {
return (src as { byteOffset?: number }).byteOffset !== undefined;
}
function isAnyArena(o: unknown): o is AnyArena {
return (o as { kind?: ArenaKind }).kind !== undefined;
}
export function allocateSegment(byteLength: number, m: Message): Segment {
trace("allocating %x bytes for %s", byteLength, m);
const res = Arena.allocate(byteLength, m._capnp.segments, m._capnp.arena);
let s: Segment;
if (res.id === m._capnp.segments.length) {
// Note how we're only allowing new segments in if they're exactly the next one in the array. There is no logical
// reason for segments to be created out of order.
s = new Segment(res.id, m, res.buffer);
trace("adding new segment %s", s);
m._capnp.segments.push(s);
} else if (res.id < 0 || res.id > m._capnp.segments.length) {
throw new Error(format(MSG_SEGMENT_OUT_OF_BOUNDS, res.id, m));
} else {
s = m._capnp.segments[res.id];
trace("replacing segment %s with buffer (len:%d)", s, res.buffer.byteLength);
s.replaceBuffer(res.buffer);
}
return s;
}
export function dump(m: Message): string {
let r = "";
if (m._capnp.segments.length === 0) {
return "================\nNo Segments\n================\n";
}
for (let i = 0; i < m._capnp.segments.length; i++) {
r += `================\nSegment #${i}\n================\n`;
const { buffer, byteLength } = m._capnp.segments[i];
const b = new Uint8Array(buffer, 0, byteLength);
r += dumpBuffer(b);
}
return r;
}
export function getRoot<T extends Struct>(RootStruct: StructCtor<T>, m: Message): T {
const root = new RootStruct(m.getSegment(0), 0);
validate(PointerType.STRUCT, root);
const ts = getTargetStructSize(root);
// Make sure the underlying pointer is actually big enough to hold the data and pointers as specified in the schema.
// If not a shallow copy of the struct contents needs to be made before returning.
if (
ts.dataByteLength < RootStruct._capnp.size.dataByteLength ||
ts.pointerLength < RootStruct._capnp.size.pointerLength
) {
trace("need to resize root struct %s", root);
resize(RootStruct._capnp.size, root);
}
return root;
}
export function getSegment(id: number, m: Message): Segment {
const segmentLength = m._capnp.segments.length;
if (id === 0 && segmentLength === 0) {
// Segment zero is special. If we have no segments in the arena we'll want to allocate a new one and leave room
// for the root pointer.
const arenaSegments = Arena.getNumSegments(m._capnp.arena);
if (arenaSegments === 0) {
allocateSegment(DEFAULT_BUFFER_SIZE, m);
} else {
// Okay, the arena already has a buffer we can use. This is totally fine.
m._capnp.segments[0] = new Segment(0, m, Arena.getBuffer(0, m._capnp.arena));
}
if (!m._capnp.segments[0].hasCapacity(8)) {
throw new Error(MSG_SEGMENT_TOO_SMALL);
}
// This will leave room for the root pointer.
m._capnp.segments[0].allocate(8);
return m._capnp.segments[0];
}
if (id < 0 || id >= segmentLength) {
throw new Error(format(MSG_SEGMENT_OUT_OF_BOUNDS, id, m));
}
return m._capnp.segments[id];
}
export function initRoot<T extends Struct>(RootStruct: StructCtor<T>, m: Message): T {
const root = new RootStruct(m.getSegment(0), 0);
initStruct(RootStruct._capnp.size, root);
trace("Initialized root pointer %s for %s.", root, m);
return root;
}
/**
* Read a pointer in raw form (a packed message with framing headers). Does not
* care or attempt to validate the input beyond parsing the message
* segments.
*
* This is typically used by the compiler to load default values, but can be
* useful to work with messages with an unknown schema.
*
* @param {ArrayBuffer} data The raw data to read.
* @returns {Pointer} A root pointer.
*/
export function readRawPointer(data: ArrayBuffer): Pointer {
return new Pointer(new Message(data).getSegment(0), 0);
}
export function setRoot(src: Pointer, m: Message): void {
Pointer.copyFrom(src, new Pointer(m.getSegment(0), 0));
}
export function toArrayBuffer(m: Message): ArrayBuffer {
const streamFrame = getStreamFrame(m);
// Make sure the first segment is allocated.
if (m._capnp.segments.length === 0) getSegment(0, m);
const segments = m._capnp.segments;
// Add space for the stream framing.
const totalLength = streamFrame.byteLength + segments.reduce((l, s) => l + padToWord(s.byteLength), 0);
const out = new Uint8Array(new ArrayBuffer(totalLength));
let o = streamFrame.byteLength;
out.set(new Uint8Array(streamFrame));
segments.forEach((s) => {
const segmentLength = padToWord(s.byteLength);
out.set(new Uint8Array(s.buffer, 0, segmentLength), o);
o += segmentLength;
});
return out.buffer;
}
export function toPackedArrayBuffer(m: Message): ArrayBuffer {
const streamFrame = pack(getStreamFrame(m));
// Make sure the first segment is allocated.
if (m._capnp.segments.length === 0) m.getSegment(0);
// NOTE: A copy operation can be avoided here if we capture the intermediate array and use that directly in the copy
// loop below, rather than have `pack()` copy it to an ArrayBuffer just to have to copy it again later. If the
// intermediate array can be avoided altogether that's even better!
const segments = m._capnp.segments.map((s) => pack(s.buffer, 0, padToWord(s.byteLength)));
const totalLength = streamFrame.byteLength + segments.reduce((l, s) => l + s.byteLength, 0);
const out = new Uint8Array(new ArrayBuffer(totalLength));
let o = streamFrame.byteLength;
out.set(new Uint8Array(streamFrame));
segments.forEach((s) => {
out.set(new Uint8Array(s), o);
o += s.byteLength;
});
return out.buffer;
}
export function getStreamFrame(m: Message): ArrayBuffer {
const length = m._capnp.segments.length;
if (length === 0) {
// Don't bother allocating the first segment, just return a single zero word for the frame header.
return new Float64Array(1).buffer;
}
const frameLength = 4 + length * 4 + (1 - (length % 2)) * 4;
const out = new DataView(new ArrayBuffer(frameLength));
trace("Writing message stream frame with segment count: %d.", length);
out.setUint32(0, length - 1, true);
m._capnp.segments.forEach((s, i) => {
trace("Message segment %d word count: %d.", s.id, s.byteLength / 8);
out.setUint32(i * 4 + 4, s.byteLength / 8, true);
});
return out.buffer;
}