From 434e1a52e5da180b888f3d1ff7f2e5e1a1f38b69 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Mon, 21 Dec 2020 20:38:40 -0800 Subject: [PATCH] Delimit messages using varints, as per sass/embedded-protocol#38 See sass/embedded-protocol#37 --- lib/src/buffer-builder.d.ts | 36 ++++++ lib/src/embedded/packet-transformer.test.ts | 76 ++++------- lib/src/embedded/packet-transformer.ts | 132 ++++++++++++-------- package.json | 1 + 4 files changed, 143 insertions(+), 102 deletions(-) create mode 100644 lib/src/buffer-builder.d.ts diff --git a/lib/src/buffer-builder.d.ts b/lib/src/buffer-builder.d.ts new file mode 100644 index 00000000..e2fa5419 --- /dev/null +++ b/lib/src/buffer-builder.d.ts @@ -0,0 +1,36 @@ +// TODO(nweiz): Use the type package once DefinitelyTyped/DefinitelyTyped#50228 +// lands. + +declare module 'buffer-builder' { + class BufferBuilder { + constructor(initialCapacity?: number | Buffer); + appendBuffer(source: Buffer): BufferBuilder; + appendUInt8(value: number): BufferBuilder; + appendUInt16LE(value: number): BufferBuilder; + appendUInt16BE(value: number): BufferBuilder; + appendUInt32LE(value: number): BufferBuilder; + appendUInt32BE(value: number): BufferBuilder; + appendInt8(value: number): BufferBuilder; + appendInt16LE(value: number): BufferBuilder; + appendInt16BE(value: number): BufferBuilder; + appendInt32LE(value: number): BufferBuilder; + appendInt32BE(value: number): BufferBuilder; + appendFloatLE(value: number): BufferBuilder; + appendFloatBE(value: number): BufferBuilder; + appendDoubleLE(value: number): BufferBuilder; + appendDoubleBE(value: number): BufferBuilder; + appendString(str: string, encoding?: string): BufferBuilder; + appendStringZero(str: string, encoding?: string): BufferBuilder; + appendFill(value: number, count: number): BufferBuilder; + get(): Buffer; + copy( + targetBuffer: Buffer, + targetStart?: number, + sourceStart?: number, + sourceEnd?: number + ): number; + readonly length: number; + } + + export = BufferBuilder; +} diff --git a/lib/src/embedded/packet-transformer.test.ts b/lib/src/embedded/packet-transformer.test.ts index 8d6c6332..0e9eeee0 100644 --- a/lib/src/embedded/packet-transformer.test.ts +++ b/lib/src/embedded/packet-transformer.test.ts @@ -22,20 +22,20 @@ describe('packet transformer', () => { it('encodes an empty message', () => { packets.writeInboundProtobuf(Buffer.from([])); - expect(encodedBuffers).toEqual([Buffer.from([0, 0, 0, 0])]); + expect(encodedBuffers).toEqual([Buffer.from([0])]); }); it('encodes a message of length 1', () => { packets.writeInboundProtobuf(Buffer.from([123])); - expect(encodedBuffers).toEqual([Buffer.from([1, 0, 0, 0, 123])]); + expect(encodedBuffers).toEqual([Buffer.from([1, 123])]); }); it('encodes a message of length greater than 256', () => { packets.writeInboundProtobuf(Buffer.alloc(300, 1)); expect(encodedBuffers).toEqual([ - Buffer.from([44, 1, 0, 0, ...new Array(300).fill(1)]), + Buffer.from([172, 2, ...new Array(300).fill(1)]), ]); }); @@ -45,9 +45,9 @@ describe('packet transformer', () => { packets.writeInboundProtobuf(Buffer.from([40, 50, 60])); expect(encodedBuffers).toEqual([ - Buffer.from([1, 0, 0, 0, 10]), - Buffer.from([2, 0, 0, 0, 20, 30]), - Buffer.from([3, 0, 0, 0, 40, 50, 60]), + Buffer.from([1, 10]), + Buffer.from([2, 20, 30]), + Buffer.from([3, 40, 50, 60]), ]); }); }); @@ -76,24 +76,6 @@ describe('packet transformer', () => { it('decodes a single chunk', async done => { expectDecoding([Buffer.from([])], done); - rawBuffers$.next(Buffer.from([0, 0, 0, 0])); - rawBuffers$.complete(); - }); - - it('decodes multiple chunks', async done => { - expectDecoding([Buffer.from([])], done); - - rawBuffers$.next(Buffer.from([0, 0])); - rawBuffers$.next(Buffer.from([0, 0])); - rawBuffers$.complete(); - }); - - it('decodes one chunk per byte', async done => { - expectDecoding([Buffer.from([])], done); - - rawBuffers$.next(Buffer.from([0])); - rawBuffers$.next(Buffer.from([0])); - rawBuffers$.next(Buffer.from([0])); rawBuffers$.next(Buffer.from([0])); rawBuffers$.complete(); }); @@ -101,7 +83,7 @@ describe('packet transformer', () => { it('decodes a chunk that contains more data', async done => { expectDecoding([Buffer.from([]), Buffer.from([100])], done); - rawBuffers$.next(Buffer.from([0, 0, 0, 0, 1, 0, 0, 0, 100])); + rawBuffers$.next(Buffer.from([0, 1, 100])); rawBuffers$.complete(); }); }); @@ -110,39 +92,39 @@ describe('packet transformer', () => { it('decodes a single chunk', async done => { expectDecoding([Buffer.from(Buffer.from([1, 2, 3, 4]))], done); - rawBuffers$.next(Buffer.from([4, 0, 0, 0, 1, 2, 3, 4])); + rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4])); rawBuffers$.complete(); }); it('decodes multiple chunks', async done => { - expectDecoding([Buffer.from([1, 2, 3, 4])], done); + expectDecoding([Buffer.alloc(300, 1)], done); - rawBuffers$.next(Buffer.from([4, 0])); - rawBuffers$.next(Buffer.from([0, 0, 1, 2])); - rawBuffers$.next(Buffer.from([3, 4])); + rawBuffers$.next(Buffer.from([172])); + rawBuffers$.next(Buffer.from([2, 1])); + rawBuffers$.next(Buffer.from(Buffer.alloc(299, 1))); rawBuffers$.complete(); }); it('decodes one chunk per byte', async done => { - expectDecoding([Buffer.from([1, 2, 3, 4])], done); + expectDecoding([Buffer.alloc(300, 1)], done); - for (const byte of [4, 0, 0, 0, 1, 2, 3, 4]) { + for (const byte of [172, 2, ...Buffer.alloc(300, 1)]) { rawBuffers$.next(Buffer.from([byte])); } rawBuffers$.complete(); }); it('decodes a chunk that contains more data', async done => { - expectDecoding([Buffer.from([1, 2, 3, 4])], done); + expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.from([0])], done); - rawBuffers$.next(Buffer.from([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0])); + rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4, 1, 0])); rawBuffers$.complete(); }); - it('decodes a chunk of length greater than 256', async done => { + it('decodes a full chunk of length greater than 256', async done => { expectDecoding([Buffer.from(new Array(300).fill(1))], done); - rawBuffers$.next(Buffer.from([44, 1, 0, 0, ...new Array(300).fill(1)])); + rawBuffers$.next(Buffer.from([172, 2, ...new Array(300).fill(1)])); rawBuffers$.complete(); }); }); @@ -154,31 +136,23 @@ describe('packet transformer', () => { done ); - rawBuffers$.next( - Buffer.from([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) - ); + rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4, 2, 101, 102])); rawBuffers$.complete(); }); it('decodes multiple chunks', async done => { - expectDecoding( - [Buffer.from([1, 2, 3, 4]), Buffer.from([101, 102])], - done - ); + expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.alloc(300, 1)], done); - rawBuffers$.next(Buffer.from([4, 0])); - rawBuffers$.next(Buffer.from([0, 0, 1, 2, 3, 4, 2, 0])); - rawBuffers$.next(Buffer.from([0, 0, 101, 102])); + rawBuffers$.next(Buffer.from([4])); + rawBuffers$.next(Buffer.from([1, 2, 3, 4, 172])); + rawBuffers$.next(Buffer.from([2, ...Buffer.alloc(300, 1)])); rawBuffers$.complete(); }); it('decodes one chunk per byte', async done => { - expectDecoding( - [Buffer.from([1, 2, 3, 4]), Buffer.from([101, 102])], - done - ); + expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.alloc(300, 1)], done); - for (const byte of [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) { + for (const byte of [4, 1, 2, 3, 4, 172, 2, ...Buffer.alloc(300, 1)]) { rawBuffers$.next(Buffer.from([byte])); } rawBuffers$.complete(); diff --git a/lib/src/embedded/packet-transformer.ts b/lib/src/embedded/packet-transformer.ts index eefa89df..daaf4d6b 100644 --- a/lib/src/embedded/packet-transformer.ts +++ b/lib/src/embedded/packet-transformer.ts @@ -2,8 +2,10 @@ // MIT-style license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. +import '../buffer-builder'; import {Observable, Subject} from 'rxjs'; import {mergeMap} from 'rxjs/operators'; +import BufferBuilder = require('buffer-builder'); /** * Decodes arbitrarily-chunked buffers, for example @@ -48,9 +50,26 @@ export class PacketTransformer { */ writeInboundProtobuf(protobuf: Buffer): void { try { - const packet = Buffer.alloc(Packet.headerByteSize + protobuf.length); - packet.writeInt32LE(protobuf.length, 0); - packet.set(protobuf, Packet.headerByteSize); + let length = protobuf.length; + if (length === 0) { + this.writeInboundBuffer(Buffer.alloc(1)); + return; + } + + // Write the length in varint format, 7 bits at a time from least to most + // significant. + const header = new BufferBuilder(8); + while (length > 0) { + // The highest-order bit indicates whether more bytes are necessary to + // fully express the number. The lower 7 bits indicate the number's + // value. + header.appendUInt8((length > 0x7f ? 0x80 : 0) | (length & 0x7f)); + length >>= 7; + } + + const packet = Buffer.alloc(header.length + protobuf.length); + header.copy(packet); + packet.set(protobuf, header.length); this.writeInboundBuffer(packet); } catch (error) { this.outboundProtobufsInternal$.error(error); @@ -75,15 +94,16 @@ export class PacketTransformer { /** A length-delimited packet comprised of a header and payload. */ class Packet { - /** - * The length of a packet header--the 4-byte little-endian number that - * indicates the payload's length. - */ - static headerByteSize = 4; + // The number of bits we've consumed so far to fill out `payloadLength`. + private payloadLengthBits = 0; - // The packet's header, indicating the payload's length. Constructed by calls - // to write(). - private header = Buffer.alloc(Packet.headerByteSize); + // The length of the next message, in bytes. + // + // This is built up from a [varint]. Once it's fully consumed, `payload` is + // initialized and this is reset to 0. + // + // [varint]: https://developers.google.com/protocol-buffers/docs/encoding#varints + private payloadLength = 0; /** * The packet's payload. Constructed by calls to write(). @@ -91,17 +111,12 @@ class Packet { */ payload?: Buffer; - // These track the progress of constructing the packet. - private headerOffset = 0; + // The offset in [payload] that should be written to next time data arrives. private payloadOffset = 0; /** Whether the packet construction is complete. */ - get isComplete() { - return ( - this.headerOffset >= this.header.length && - this.payload && - this.payloadOffset >= this.payload?.length - ); + get isComplete(): boolean { + return !!(this.payload && this.payloadOffset >= this.payloadLength); } /** @@ -115,42 +130,57 @@ class Packet { throw Error('Cannot write to a completed Packet.'); } - let bytesWritten = 0; - - if (this.headerOffset < this.header.length) { - bytesWritten = writeBuffer(source, this.header, this.headerOffset); - this.headerOffset += bytesWritten; - if (this.headerOffset < this.header.length) return bytesWritten; - } - + // The index of the next byte to read from [source]. We have to track this + // because the source may contain the length *and* the message. + let i = 0; + + // We can be in one of two states here: + // + // * [payload] is `null`, in which case we're adding data to [payloadLength] + // until we reach a byte with its most significant bit set to 0. + // + // * [payload] is not `null`, in which case we're waiting for + // [payloadOffset] to reach [payloadLength] bytes in it so this packet is + // complete. if (!this.payload) { - const payloadLength = this.header.readUInt32LE(0); - this.payload = Buffer.alloc(payloadLength); + for (;;) { + const byte = source[i]; + + // Varints encode data in the 7 lower bits of each byte, which we access + // by masking with 0x7f = 0b01111111. + this.payloadLength += (byte & 0x7f) << this.payloadLengthBits; + this.payloadLengthBits += 7; + i++; + + if (byte <= 0x7f) { + // If the byte is lower than 0x7f = 0b01111111, that means its high + // bit is unset which and we now know the full message length and can + // initialize [this.payload]. + this.payload = Buffer.alloc(this.payloadLength); + break; + } else if (i === source.length) { + // If we've hit the end of the source chunk, we need to wait for the + // next chunk to arrive. Just return the number of bytes we've + // consumed so far. + return i; + } else { + // Otherwise, we continue reading bytes from the source data to fill + // in [this.payloadLength]. + } + } } - const payloadBytesWritten = writeBuffer( - source.slice(bytesWritten), - this.payload, - this.payloadOffset + // Copy as many bytes as we can from [source] to [payload], making sure not + // to try to copy more than the payload can hold (if the source has another + // message after the current one) or more than the source has available (if + // the current message is split across multiple chunks). + const bytesToWrite = Math.min( + this.payload.length - this.payloadOffset, + source.length - i ); - this.payloadOffset += payloadBytesWritten; + this.payload.set(source.subarray(i, i + bytesToWrite), this.payloadOffset); + this.payloadOffset += bytesToWrite; - return bytesWritten + payloadBytesWritten; + return i + bytesToWrite; } } - -// Fills the destination buffer, starting at the offset index, with bytes from -// the source buffer. Returns the number of bytes written. Does not write beyond -// the length of the destination. -function writeBuffer( - source: Buffer, - destination: Buffer, - destinationOffset: number -): number { - const bytesToWrite = Math.min( - source.length, - destination.length - destinationOffset - ); - destination.set(source.slice(0, bytesToWrite), destinationOffset); - return bytesToWrite; -} diff --git a/package.json b/package.json index 7317013d..8d8abf24 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "test": "ts-node ./node_modules/jasmine/bin/jasmine" }, "dependencies": { + "buffer-builder": "^0.2.0", "extract-zip": "^2.0.1", "google-protobuf": "^3.11.4", "immutable": "^4.0.0-rc.12",