Skip to content
This repository has been archived by the owner on May 15, 2023. It is now read-only.

Commit

Permalink
Delimit messages using varints, as per sass/embedded-protocol#38
Browse files Browse the repository at this point in the history
  • Loading branch information
nex3 committed Dec 22, 2020
1 parent 0aa346c commit 02855da
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 81 deletions.
108 changes: 63 additions & 45 deletions lib/src/util/length_delimited_transformer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:typed_data/typed_data.dart';

/// A [StreamChannelTransformer] that converts a channel that sends and receives
/// arbitrarily-chunked binary data to one that sends and receives packets of
Expand All @@ -21,17 +22,16 @@ final StreamChannelTransformer<Uint8List, List<int>> lengthDelimited =
/// into a stream of packet contents.
final lengthDelimitedDecoder =
StreamTransformer<List<int>, Uint8List>.fromBind((stream) {
// The buffer into which the four-byte little-endian length of the next packet
// will be written.
var lengthBuffer = Uint8List(4);
// The number of bits we've consumed so far to fill out [nextMessageLength].
int nextMessageLengthBits = 0;

// The index of the next byte to write to [lengthBuffer]. Once this is equal
// to [lengthBuffer.length], the full length is available.
var lengthBufferIndex = 0;

// The length of the next message, in bytes, read from [lengthBuffer] once
// it's full.
int nextMessageLength;
// The length of the next message, in bytes.
//
// This is built up from a [varint]. Once it's fully consumed, [buffer] is
// initialized and this is reset to 0.
//
// [varint]: https://developers.google.com/protocol-buffers/docs/encoding#varints
int nextMessageLength = 0;

// The buffer into which the packet data itself is written. Initialized once
// [nextMessageLength] is known.
Expand All @@ -53,47 +53,52 @@ final lengthDelimitedDecoder =
// multiple messages.
var i = 0;

// Adds bytes from [chunk] to [destination] at [destinationIndex] without
// overflowing the bounds of [destination], and increments [i] for each byte
// written.
//
// Returns the number of bytes written.
int writeFromChunk(Uint8List destination, int destinationIndex) {
var bytesToWrite =
math.min(destination.length - destinationIndex, chunk.length - i);
destination.setRange(
destinationIndex, destinationIndex + bytesToWrite, chunk, i);
i += bytesToWrite;
return bytesToWrite;
}

while (i < chunk.length) {
// We can be in one of two states here:
//
// * Both [nextMessageLength] and [buffer] are `null`, in which case we're
// waiting until we have four bytes in [lengthBuffer] to know how big of
// a buffer to allocate.
// * [buffer] is `null`, in which case we're adding data to
// [nextMessageLength] until we reach a byte with its most significant
// bit set to 0.
//
// * Neither [nextMessageLength] nor [buffer] are `null`, in which case
// we're waiting for [buffer] to have [nextMessageLength] in it before
// we send it to [queue.local.sink] and start waiting for the next
// message.
if (nextMessageLength == null) {
lengthBufferIndex += writeFromChunk(lengthBuffer, lengthBufferIndex);
if (lengthBufferIndex < 4) return;

nextMessageLength =
ByteData.view(lengthBuffer.buffer).getUint32(0, Endian.little);
// * [buffer] is not `null`, in which case we're waiting for [buffer] to
// have [nextMessageLength] bytes in it before we send it to
// [queue.local.sink] and start waiting for the next message.
if (buffer == null) {
var byte = chunk[i];

// Varints encode data in the 7 lower bits of each byte, which we access
// by masking with 0x7f = 0b01111111.
nextMessageLength += (byte & 0x7f) << nextMessageLengthBits;
nextMessageLengthBits += 7;
i++;

// If the byte is higher than 0x7f = 0b01111111, that means its high bit
// is set which and so there are more bytes to consume before we know
// the full message length.
if (byte > 0x7f) continue;

// Otherwise, [nextMessageLength] is now finalized and we can allocate
// the data buffer.
buffer = Uint8List(nextMessageLength);
bufferIndex = 0;
}

bufferIndex += writeFromChunk(buffer, bufferIndex);
// Copy as many bytes as we can from [chunk] to [buffer], making sure not
// to try to copy more than the buffer can hold (if the chunk has another
// message after the current one) or more than the chunk has available (if
// the current message is split across multiple chunks).
var bytesToWrite =
math.min(buffer.length - bufferIndex, chunk.length - i);
buffer.setRange(
bufferIndex, bufferIndex + bytesToWrite, chunk, i);
i += bytesToWrite;
bufferIndex += bytesToWrite;
if (bufferIndex < nextMessageLength) return;

sink.add(Uint8List.view(buffer.buffer, 0, nextMessageLength));
lengthBufferIndex = 0;
nextMessageLength = null;
// Once we've filled the buffer, emit it and reset our state.
sink.add(buffer);
nextMessageLength = 0;
nextMessageLengthBits = 0;
buffer = null;
bufferIndex = null;
}
Expand All @@ -106,9 +111,22 @@ final lengthDelimitedDecoder =
final lengthDelimitedEncoder =
StreamTransformer<Uint8List, List<int>>.fromHandlers(
handleData: (message, sink) {
var messageLength = Uint8List(4);
ByteData.view(messageLength.buffer)
.setUint32(0, message.length, Endian.little);
sink.add(messageLength);
var length = message.length;
if (length == 0) {
sink.add([0]);
return;
}

// Write the length in varint format, 7 bits at a time from least to most
// significant.
var lengthBuffer = Uint8Buffer();
while (length > 0) {
// The highest-order bit indicates whether more bytes are necessary to fully
// express the number. The lower 7 bits indicate the number's value.
lengthBuffer.add((length > 0x7f ? 0x80 : 0)|(length & 0x7f));
length >>= 7;
}

sink.add(Uint8List.view(lengthBuffer.buffer, 0, lengthBuffer.length));
sink.add(message);
});
3 changes: 2 additions & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: sass_embedded
version: 1.0.0-beta.5
version: 1.0.0-dev
description: An implementation of the Sass embedded protocol using Dart Sass.
author: Sass Team
homepage: https://github.com/sass/dart-sass-embedded
Expand All @@ -19,6 +19,7 @@ dependencies:
source_span: ^1.1.0
stack_trace: ^1.6.0
stream_channel: ">=1.6.0 <3.0.0"
typed_data: ^1.1.0

dev_dependencies:
cli_pkg: ^1.0.0-beta.8
Expand Down
52 changes: 18 additions & 34 deletions test/length_delimited_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ void main() {
test("encodes an empty message", () {
sink.add([]);
sink.close();
expect(collectBytes(stream), completion(equals([0, 0, 0, 0])));
expect(collectBytes(stream), completion(equals([0])));
});

test("encodes a message of length 1", () {
sink.add([123]);
sink.close();
expect(collectBytes(stream), completion(equals([1, 0, 0, 0, 123])));
expect(collectBytes(stream), completion(equals([1, 123])));
});

test("encodes a message of length greater than 256", () {
sink.add(List.filled(300, 1));
sink.close();
expect(collectBytes(stream),
completion(equals([44, 1, 0, 0, ...List.filled(300, 1)])));
completion(equals([172, 2, ...List.filled(300, 1)])));
});

test("encodes multiple messages", () {
Expand All @@ -49,7 +49,7 @@ void main() {
expect(
collectBytes(stream),
completion(equals(
[1, 0, 0, 0, 10, 2, 0, 0, 0, 20, 30, 3, 0, 0, 0, 40, 50, 60])));
[1, 10, 2, 20, 30, 3, 40, 50, 60])));
});
});

Expand All @@ -64,75 +64,59 @@ void main() {

group("decodes an empty message", () {
test("from a single chunk", () {
sink.add([0, 0, 0, 0]);
expect(queue, emits(isEmpty));
});

test("from multiple chunks", () {
sink.add([0, 0]);
sink.add([0, 0]);
expect(queue, emits(isEmpty));
});

test("from one chunk per byte", () {
sink..add([0])..add([0])..add([0])..add([0]);
sink.add([0]);
expect(queue, emits(isEmpty));
});

test("from a chunk that contains more data", () {
sink.add([0, 0, 0, 0, 1, 0, 0, 0, 100]);
sink.add([0, 1, 100]);
expect(queue, emits(isEmpty));
});
});

group("decodes a longer message", () {
test("from a single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4]);
expect(queue, emits([1, 2, 3, 4]));
sink.add([172, 2, ...List.filled(300, 1)]);
expect(queue, emits(List.filled(300, 1)));
});

test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2])..add([3, 4]);
expect(queue, emits([1, 2, 3, 4]));
sink..add([172])..add([2, 1])..add(List.filled(299, 1));
expect(queue, emits(List.filled(300, 1)));
});

test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4]) {
for (var byte in [172, 2, ...List.filled(300, 1)]) {
sink.add([byte]);
}
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits(List.filled(300, 1)));
});

test("from a chunk that contains more data", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 1, 0, 0, 0]);
expect(queue, emits([1, 2, 3, 4]));
});

test("of length greater than 256", () {
sink.add([44, 1, 0, 0, ...List.filled(300, 1)]);
sink.add([172, 2, ...List.filled(300, 1), 1, 10]);
expect(queue, emits(List.filled(300, 1)));
});
});

group("decodes multiple messages", () {
test("from single chunk", () {
sink.add([4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]);
sink.add([4, 1, 2, 3, 4, 2, 101, 102]);
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
});

test("from multiple chunks", () {
sink..add([4, 0])..add([0, 0, 1, 2, 3, 4, 2, 0])..add([0, 0, 101, 102]);
sink..add([4])..add([1, 2, 3, 4, 172])..add([2, ...List.filled(300, 1)]);
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
expect(queue, emits(List.filled(300, 1)));
});

test("from one chunk per byte", () {
for (var byte in [4, 0, 0, 0, 1, 2, 3, 4, 2, 0, 0, 0, 101, 102]) {
for (var byte in [4, 1, 2, 3, 4, 172, 2, ...List.filled(300, 1)]) {
sink.add([byte]);
}
expect(queue, emits([1, 2, 3, 4]));
expect(queue, emits([101, 102]));
expect(queue, emits(List.filled(300, 1)));
});
});
});
Expand Down
2 changes: 1 addition & 1 deletion test/protocol_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ void main() {
});

test("caused by an invalid message", () async {
process.stdin.add([1, 0, 0, 0, 0]);
process.stdin.add([1, 0]);
await expectParseError(
process, "Protocol message contained an invalid tag (zero).");
expect(await process.exitCode, 76);
Expand Down

0 comments on commit 02855da

Please sign in to comment.