Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: simplify snappy frame encoding #5617

Merged
merged 3 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
"@chainsafe/libp2p-gossipsub": "^6.2.0",
"@chainsafe/libp2p-noise": "^11.0.4",
"@chainsafe/persistent-merkle-tree": "^0.5.0",
"@chainsafe/snappy-stream": "^5.1.2",
"@chainsafe/ssz": "^0.10.2",
"@chainsafe/threads": "^1.11.0",
"@ethersproject/abi": "^5.7.0",
Expand Down Expand Up @@ -149,7 +148,6 @@
"prometheus-gc-stats": "^0.6.4",
"qs": "^6.11.1",
"snappyjs": "^0.7.0",
"stream-to-it": "^0.2.4",
"strict-event-emitter-types": "^2.0.0",
"systeminformation": "^5.17.12",
"uint8arraylist": "^2.4.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ describe("reqresp encoder", () => {
}

const chunks = await all(stream.source);
const join = (c: string[]): string => c.join("").replace(/0x/g, "");
const chunksHex = chunks.map((chunk) => toHex(chunk.slice(0, chunk.byteLength)));
expect(chunksHex).deep.equals(expectedChunks, `not expected response to ${protocol}`);
expect(join(chunksHex)).deep.equals(join(expectedChunks), `not expected response to ${protocol}`);
}

it("assert correct handler switch between metadata v2 and v1", async () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/reqresp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@
"check-readme": "typescript-docs-verifier"
},
"dependencies": {
"@chainsafe/snappy-stream": "^5.1.2",
"@chainsafe/fast-crc32c": "^4.1.1",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^2.0.1",
"@lodestar/config": "^1.8.0",
"@lodestar/params": "^1.8.0",
"@lodestar/utils": "^1.8.0",
"it-all": "^3.0.1",
"snappy": "^7.2.2",
"snappyjs": "^0.7.0",
"stream-to-it": "^0.2.4",
"uint8arraylist": "^2.4.3",
"varint": "^6.0.0"
},
Expand Down
21 changes: 3 additions & 18 deletions packages/reqresp/src/encodingStrategies/sszSnappy/encode.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import varint from "varint";
import {source} from "stream-to-it";
import snappy from "@chainsafe/snappy-stream";
import {encodeSnappy} from "./snappyFrames/compress.js";

/**
* ssz_snappy encoding strategy writer.
Expand All @@ -9,9 +8,7 @@ import snappy from "@chainsafe/snappy-stream";
* <encoding-dependent-header> | <encoded-payload>
* ```
*/
export async function* writeSszSnappyPayload(bodyData: Uint8Array): AsyncGenerator<Buffer> {
yield* encodeSszSnappy(bodyData as Buffer);
}
export const writeSszSnappyPayload = encodeSszSnappy as (bytes: Uint8Array) => AsyncGenerator<Buffer>;

/**
* Buffered Snappy writer
Expand All @@ -22,17 +19,5 @@ export async function* encodeSszSnappy(bytes: Buffer): AsyncGenerator<Buffer> {

// By first computing and writing the SSZ byte length, the SSZ encoder can then directly
// write the chunk contents to the stream. Snappy writer compresses frame by frame

/**
* Use sync version (default) for compress as it is almost 2x faster than async
* one and most payloads are "1 chunk" and 100kb payloads (which would mostly be
* big bellatrix blocks with transactions) are just 2 chunks
*
* To use async version (for e.g. on big payloads) instantiate the stream with
* `createCompressStream({asyncCompress: true})`
*/
const stream = snappy.createCompressStream();
stream.write(bytes);
stream.end();
yield* source<Buffer>(stream);
yield* encodeSnappy(bytes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
export enum ChunkType {
IDENTIFIER = 0xff,
COMPRESSED = 0x00,
UNCOMPRESSED = 0x01,
PADDING = 0xfe,
}

export const IDENTIFIER = Buffer.from([0x73, 0x4e, 0x61, 0x50, 0x70, 0x59]);
export const IDENTIFIER_FRAME = Buffer.from([0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import snappy from "snappy";
import crc32c from "@chainsafe/fast-crc32c";
import {ChunkType, IDENTIFIER_FRAME} from "./common.js";

// The logic in this file is largely copied (in simplified form) from https://github.com/ChainSafe/node-snappy-stream/

/**
* As per the snappy framing format for streams, the size of any uncompressed chunk can be
* no longer than 65536 bytes.
*
* From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92
*/
const UNCOMPRESSED_CHUNK_SIZE = 65536;

function checksum(value: Buffer): Buffer {
const x = crc32c.calculate(value);
const result = Buffer.allocUnsafe?.(4) ?? Buffer.alloc(4);

// As defined in section 3 of https://github.com/google/snappy/blob/master/framing_format.txt
// And other implementations for reference:
// Go: https://github.com/golang/snappy/blob/2e65f85255dbc3072edf28d6b5b8efc472979f5a/snappy.go#L97
// Python: https://github.com/andrix/python-snappy/blob/602e9c10d743f71bef0bac5e4c4dffa17340d7b3/snappy/snappy.py#L70
// Mask the right hand to (32 - 17) = 15 bits -> 0x7fff, to keep correct 32 bit values.
// Shift the left hand with >>> for correct 32 bit intermediate result.
// Then final >>> 0 for 32 bits output
result.writeUInt32LE((((x >>> 15) | ((x & 0x7fff) << 17)) + 0xa282ead8) >>> 0, 0);

return result;
}

export async function* encodeSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
yield IDENTIFIER_FRAME;

for (let i = 0; i < bytes.length; i += UNCOMPRESSED_CHUNK_SIZE) {
const chunk = bytes.subarray(i, i + UNCOMPRESSED_CHUNK_SIZE);
const compressed = snappy.compressSync(chunk);
if (compressed.length < chunk.length) {
const size = compressed.length + 4;
yield Buffer.concat([
Buffer.from([ChunkType.COMPRESSED, size, size >> 8, size >> 16]),
checksum(chunk),
compressed,
]);
} else {
const size = chunk.length + 4;
yield Buffer.concat([
//
Buffer.from([ChunkType.UNCOMPRESSED, size, size >> 8, size >> 16]),
checksum(chunk),
chunk,
]);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {uncompress} from "snappyjs";
import {Uint8ArrayList} from "uint8arraylist";

const IDENTIFIER = Buffer.from([0x73, 0x4e, 0x61, 0x50, 0x70, 0x59]);
import {ChunkType, IDENTIFIER} from "./common.js";

export class SnappyFramesUncompress {
private buffer = new Uint8ArrayList();
Expand Down Expand Up @@ -70,13 +69,6 @@ type UncompressState = {
foundIdentifier: boolean;
};

enum ChunkType {
IDENTIFIER = 0xff,
COMPRESSED = 0x00,
UNCOMPRESSED = 0x01,
PADDING = 0xfe,
}

function getFrameSize(buffer: Uint8ArrayList, offset: number): number {
return buffer.get(offset) + (buffer.get(offset + 1) << 8) + (buffer.get(offset + 2) << 16);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
import {expect} from "chai";
import {Uint8ArrayList} from "uint8arraylist";
import snappy from "@chainsafe/snappy-stream";
import {pipe} from "it-pipe";
import {SnappyFramesUncompress} from "../../../../../src/encodingStrategies/sszSnappy/snappyFrames/uncompress.js";
import {encodeSnappy} from "../../../../../src/encodingStrategies/sszSnappy/snappyFrames/compress.js";

describe("encodingStrategies / sszSnappy / snappy frames / uncompress", function () {
it("should work with short input", function (done) {
const compressStream = snappy.createCompressStream();
const testData = "Small test data";
const compressIterable = encodeSnappy(Buffer.from(testData));

const decompress = new SnappyFramesUncompress();

const testData = "Small test data";

compressStream.on("data", function (data) {
const result = decompress.uncompress(data);
if (result) {
expect(result.subarray().toString()).to.be.equal(testData);
done();
void pipe(compressIterable, async function (source) {
for await (const data of source) {
const result = decompress.uncompress(new Uint8ArrayList(data));
if (result) {
expect(result.subarray().toString()).to.be.equal(testData);
done();
}
}
});

compressStream.write(testData);
});

it("should work with huge input", function (done) {
const compressStream = snappy.createCompressStream();

const decompress = new SnappyFramesUncompress();

const testData = Buffer.alloc(100000, 4).toString();
const compressIterable = encodeSnappy(Buffer.from(testData));
let result = Buffer.alloc(0);
const decompress = new SnappyFramesUncompress();

compressStream.on("data", function (data) {
// testData will come compressed as two or more chunks
result = Buffer.concat([result, decompress.uncompress(data)?.subarray() ?? Buffer.alloc(0)]);
if (result.length === testData.length) {
expect(result.toString()).to.be.equal(testData);
done();
void pipe(compressIterable, async function (source) {
for await (const data of source) {
// testData will come compressed as two or more chunks
result = Buffer.concat([
result,
decompress.uncompress(new Uint8ArrayList(data))?.subarray() ?? Buffer.alloc(0),
]);
if (result.length === testData.length) {
expect(result.toString()).to.be.equal(testData);
done();
}
}
});

compressStream.write(testData);
});

it("should detect malformed input", function () {
Expand Down
5 changes: 4 additions & 1 deletion packages/reqresp/test/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ export async function* arrToSource<T>(arr: T[]): AsyncGenerator<T> {
* Wrapper for type-safety to ensure and array of Buffers is equal with a diff in hex
*/
export function expectEqualByteChunks(chunks: Uint8Array[], expectedChunks: Uint8Array[], message?: string): void {
expect(chunks.map(toHexString)).to.deep.equal(expectedChunks.map(toHexString), message);
expect(chunks.map(toHexString).join("").replace(/0x/g, "")).to.deep.equal(
expectedChunks.map(toHexString).join("").replace(/0x/g, ""),
message
);
}

export function expectInEqualByteChunks(chunks: Uint8Array[], expectedChunks: Uint8Array[], message?: string): void {
Expand Down
6 changes: 0 additions & 6 deletions types/snappy-stream/index.d.ts

This file was deleted.

4 changes: 0 additions & 4 deletions types/stream-to-it/index.d.ts

This file was deleted.

Loading