Skip to content

Commit

Permalink
Delimit messages using varints, as per sass/embedded-protocol#38
Browse files Browse the repository at this point in the history
  • Loading branch information
nex3 committed Dec 30, 2020
1 parent c48e216 commit 434e1a5
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 102 deletions.
36 changes: 36 additions & 0 deletions lib/src/buffer-builder.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// TODO(nweiz): Use the type package once DefinitelyTyped/DefinitelyTyped#50228
// lands.

declare module 'buffer-builder' {
class BufferBuilder {
constructor(initialCapacity?: number | Buffer);
appendBuffer(source: Buffer): BufferBuilder;
appendUInt8(value: number): BufferBuilder;
appendUInt16LE(value: number): BufferBuilder;
appendUInt16BE(value: number): BufferBuilder;
appendUInt32LE(value: number): BufferBuilder;
appendUInt32BE(value: number): BufferBuilder;
appendInt8(value: number): BufferBuilder;
appendInt16LE(value: number): BufferBuilder;
appendInt16BE(value: number): BufferBuilder;
appendInt32LE(value: number): BufferBuilder;
appendInt32BE(value: number): BufferBuilder;
appendFloatLE(value: number): BufferBuilder;
appendFloatBE(value: number): BufferBuilder;
appendDoubleLE(value: number): BufferBuilder;
appendDoubleBE(value: number): BufferBuilder;
appendString(str: string, encoding?: string): BufferBuilder;
appendStringZero(str: string, encoding?: string): BufferBuilder;
appendFill(value: number, count: number): BufferBuilder;
get(): Buffer;
copy(
targetBuffer: Buffer,
targetStart?: number,
sourceStart?: number,
sourceEnd?: number
): number;
readonly length: number;
}

export = BufferBuilder;
}
76 changes: 25 additions & 51 deletions lib/src/embedded/packet-transformer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ describe('packet transformer', () => {
it('encodes an empty message', () => {
packets.writeInboundProtobuf(Buffer.from([]));

expect(encodedBuffers).toEqual([Buffer.from([0, 0, 0, 0])]);
expect(encodedBuffers).toEqual([Buffer.from([0])]);
});

it('encodes a message of length 1', () => {
packets.writeInboundProtobuf(Buffer.from([123]));

expect(encodedBuffers).toEqual([Buffer.from([1, 0, 0, 0, 123])]);
expect(encodedBuffers).toEqual([Buffer.from([1, 123])]);
});

it('encodes a message of length greater than 256', () => {
packets.writeInboundProtobuf(Buffer.alloc(300, 1));

expect(encodedBuffers).toEqual([
Buffer.from([44, 1, 0, 0, ...new Array(300).fill(1)]),
Buffer.from([172, 2, ...new Array(300).fill(1)]),
]);
});

Expand All @@ -45,9 +45,9 @@ describe('packet transformer', () => {
packets.writeInboundProtobuf(Buffer.from([40, 50, 60]));

expect(encodedBuffers).toEqual([
Buffer.from([1, 0, 0, 0, 10]),
Buffer.from([2, 0, 0, 0, 20, 30]),
Buffer.from([3, 0, 0, 0, 40, 50, 60]),
Buffer.from([1, 10]),
Buffer.from([2, 20, 30]),
Buffer.from([3, 40, 50, 60]),
]);
});
});
Expand Down Expand Up @@ -76,32 +76,14 @@ describe('packet transformer', () => {
it('decodes a single chunk', async done => {
expectDecoding([Buffer.from([])], done);

rawBuffers$.next(Buffer.from([0, 0, 0, 0]));
rawBuffers$.complete();
});

it('decodes multiple chunks', async done => {
expectDecoding([Buffer.from([])], done);

rawBuffers$.next(Buffer.from([0, 0]));
rawBuffers$.next(Buffer.from([0, 0]));
rawBuffers$.complete();
});

it('decodes one chunk per byte', async done => {
expectDecoding([Buffer.from([])], done);

rawBuffers$.next(Buffer.from([0]));
rawBuffers$.next(Buffer.from([0]));
rawBuffers$.next(Buffer.from([0]));
rawBuffers$.next(Buffer.from([0]));
rawBuffers$.complete();
});

it('decodes a chunk that contains more data', async done => {
expectDecoding([Buffer.from([]), Buffer.from([100])], done);

rawBuffers$.next(Buffer.from([0, 0, 0, 0, 1, 0, 0, 0, 100]));
rawBuffers$.next(Buffer.from([0, 1, 100]));
rawBuffers$.complete();
});
});
Expand All @@ -110,39 +92,39 @@ describe('packet transformer', () => {
it('decodes a single chunk', async done => {
expectDecoding([Buffer.from(Buffer.from([1, 2, 3, 4]))], done);

rawBuffers$.next(Buffer.from([4, 0, 0, 0, 1, 2, 3, 4]));
rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4]));
rawBuffers$.complete();
});

it('decodes multiple chunks', async done => {
expectDecoding([Buffer.from([1, 2, 3, 4])], done);
expectDecoding([Buffer.alloc(300, 1)], done);

rawBuffers$.next(Buffer.from([4, 0]));
rawBuffers$.next(Buffer.from([0, 0, 1, 2]));
rawBuffers$.next(Buffer.from([3, 4]));
rawBuffers$.next(Buffer.from([172]));
rawBuffers$.next(Buffer.from([2, 1]));
rawBuffers$.next(Buffer.from(Buffer.alloc(299, 1)));
rawBuffers$.complete();
});

it('decodes one chunk per byte', async done => {
expectDecoding([Buffer.from([1, 2, 3, 4])], done);
expectDecoding([Buffer.alloc(300, 1)], done);

for (const byte of [4, 0, 0, 0, 1, 2, 3, 4]) {
for (const byte of [172, 2, ...Buffer.alloc(300, 1)]) {
rawBuffers$.next(Buffer.from([byte]));
}
rawBuffers$.complete();
});

it('decodes a chunk that contains more data', async done => {
expectDecoding([Buffer.from([1, 2, 3, 4])], done);
expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.from([0])], done);

rawBuffers$.next(Buffer.from([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]));
rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4, 1, 0]));
rawBuffers$.complete();
});

it('decodes a chunk of length greater than 256', async done => {
it('decodes a full chunk of length greater than 256', async done => {
expectDecoding([Buffer.from(new Array(300).fill(1))], done);

rawBuffers$.next(Buffer.from([44, 1, 0, 0, ...new Array(300).fill(1)]));
rawBuffers$.next(Buffer.from([172, 2, ...new Array(300).fill(1)]));
rawBuffers$.complete();
});
});
Expand All @@ -154,31 +136,23 @@ describe('packet transformer', () => {
done
);

rawBuffers$.next(
Buffer.from([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102])
);
rawBuffers$.next(Buffer.from([4, 1, 2, 3, 4, 2, 101, 102]));
rawBuffers$.complete();
});

it('decodes multiple chunks', async done => {
expectDecoding(
[Buffer.from([1, 2, 3, 4]), Buffer.from([101, 102])],
done
);
expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.alloc(300, 1)], done);

rawBuffers$.next(Buffer.from([4, 0]));
rawBuffers$.next(Buffer.from([0, 0, 1, 2, 3, 4, 2, 0]));
rawBuffers$.next(Buffer.from([0, 0, 101, 102]));
rawBuffers$.next(Buffer.from([4]));
rawBuffers$.next(Buffer.from([1, 2, 3, 4, 172]));
rawBuffers$.next(Buffer.from([2, ...Buffer.alloc(300, 1)]));
rawBuffers$.complete();
});

it('decodes one chunk per byte', async done => {
expectDecoding(
[Buffer.from([1, 2, 3, 4]), Buffer.from([101, 102])],
done
);
expectDecoding([Buffer.from([1, 2, 3, 4]), Buffer.alloc(300, 1)], done);

for (const byte of [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) {
for (const byte of [4, 1, 2, 3, 4, 172, 2, ...Buffer.alloc(300, 1)]) {
rawBuffers$.next(Buffer.from([byte]));
}
rawBuffers$.complete();
Expand Down
132 changes: 81 additions & 51 deletions lib/src/embedded/packet-transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// MIT-style license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

import '../buffer-builder';
import {Observable, Subject} from 'rxjs';
import {mergeMap} from 'rxjs/operators';
import BufferBuilder = require('buffer-builder');

/**
* Decodes arbitrarily-chunked buffers, for example
Expand Down Expand Up @@ -48,9 +50,26 @@ export class PacketTransformer {
*/
writeInboundProtobuf(protobuf: Buffer): void {
try {
const packet = Buffer.alloc(Packet.headerByteSize + protobuf.length);
packet.writeInt32LE(protobuf.length, 0);
packet.set(protobuf, Packet.headerByteSize);
let length = protobuf.length;
if (length === 0) {
this.writeInboundBuffer(Buffer.alloc(1));
return;
}

// Write the length in varint format, 7 bits at a time from least to most
// significant.
const header = new BufferBuilder(8);
while (length > 0) {
// The highest-order bit indicates whether more bytes are necessary to
// fully express the number. The lower 7 bits indicate the number's
// value.
header.appendUInt8((length > 0x7f ? 0x80 : 0) | (length & 0x7f));
length >>= 7;
}

const packet = Buffer.alloc(header.length + protobuf.length);
header.copy(packet);
packet.set(protobuf, header.length);
this.writeInboundBuffer(packet);
} catch (error) {
this.outboundProtobufsInternal$.error(error);
Expand All @@ -75,33 +94,29 @@ export class PacketTransformer {

/** A length-delimited packet comprised of a header and payload. */
class Packet {
/**
* The length of a packet header--the 4-byte little-endian number that
* indicates the payload's length.
*/
static headerByteSize = 4;
// The number of bits we've consumed so far to fill out `payloadLength`.
private payloadLengthBits = 0;

// The packet's header, indicating the payload's length. Constructed by calls
// to write().
private header = Buffer.alloc(Packet.headerByteSize);
// The length of the next message, in bytes.
//
// This is built up from a [varint]. Once it's fully consumed, `payload` is
// initialized and this is reset to 0.
//
// [varint]: https://developers.google.com/protocol-buffers/docs/encoding#varints
private payloadLength = 0;

/**
* The packet's payload. Constructed by calls to write().
* @see write
*/
payload?: Buffer;

// These track the progress of constructing the packet.
private headerOffset = 0;
// The offset in [payload] that should be written to next time data arrives.
private payloadOffset = 0;

/** Whether the packet construction is complete. */
get isComplete() {
return (
this.headerOffset >= this.header.length &&
this.payload &&
this.payloadOffset >= this.payload?.length
);
get isComplete(): boolean {
return !!(this.payload && this.payloadOffset >= this.payloadLength);
}

/**
Expand All @@ -115,42 +130,57 @@ class Packet {
throw Error('Cannot write to a completed Packet.');
}

let bytesWritten = 0;

if (this.headerOffset < this.header.length) {
bytesWritten = writeBuffer(source, this.header, this.headerOffset);
this.headerOffset += bytesWritten;
if (this.headerOffset < this.header.length) return bytesWritten;
}

// The index of the next byte to read from [source]. We have to track this
// because the source may contain the length *and* the message.
let i = 0;

// We can be in one of two states here:
//
// * [payload] is `null`, in which case we're adding data to [payloadLength]
// until we reach a byte with its most significant bit set to 0.
//
// * [payload] is not `null`, in which case we're waiting for
// [payloadOffset] to reach [payloadLength] bytes in it so this packet is
// complete.
if (!this.payload) {
const payloadLength = this.header.readUInt32LE(0);
this.payload = Buffer.alloc(payloadLength);
for (;;) {
const byte = source[i];

// Varints encode data in the 7 lower bits of each byte, which we access
// by masking with 0x7f = 0b01111111.
this.payloadLength += (byte & 0x7f) << this.payloadLengthBits;
this.payloadLengthBits += 7;
i++;

if (byte <= 0x7f) {
// If the byte is lower than 0x7f = 0b01111111, that means its high
// bit is unset which and we now know the full message length and can
// initialize [this.payload].
this.payload = Buffer.alloc(this.payloadLength);
break;
} else if (i === source.length) {
// If we've hit the end of the source chunk, we need to wait for the
// next chunk to arrive. Just return the number of bytes we've
// consumed so far.
return i;
} else {
// Otherwise, we continue reading bytes from the source data to fill
// in [this.payloadLength].
}
}
}

const payloadBytesWritten = writeBuffer(
source.slice(bytesWritten),
this.payload,
this.payloadOffset
// Copy as many bytes as we can from [source] to [payload], making sure not
// to try to copy more than the payload can hold (if the source has another
// message after the current one) or more than the source has available (if
// the current message is split across multiple chunks).
const bytesToWrite = Math.min(
this.payload.length - this.payloadOffset,
source.length - i
);
this.payloadOffset += payloadBytesWritten;
this.payload.set(source.subarray(i, i + bytesToWrite), this.payloadOffset);
this.payloadOffset += bytesToWrite;

return bytesWritten + payloadBytesWritten;
return i + bytesToWrite;
}
}

// Fills the destination buffer, starting at the offset index, with bytes from
// the source buffer. Returns the number of bytes written. Does not write beyond
// the length of the destination.
function writeBuffer(
source: Buffer,
destination: Buffer,
destinationOffset: number
): number {
const bytesToWrite = Math.min(
source.length,
destination.length - destinationOffset
);
destination.set(source.slice(0, bytesToWrite), destinationOffset);
return bytesToWrite;
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"test": "ts-node ./node_modules/jasmine/bin/jasmine"
},
"dependencies": {
"buffer-builder": "^0.2.0",
"extract-zip": "^2.0.1",
"google-protobuf": "^3.11.4",
"immutable": "^4.0.0-rc.12",
Expand Down

0 comments on commit 434e1a5

Please sign in to comment.