Skip to content

feat: Implement IPC RecordBatch body buffer compression #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
80f47ca
Define commpression registry
Djjanks May 19, 2025
43905c8
Implement RecordBatch body decompression
Djjanks May 19, 2025
8eb03e7
Export compressionType, Codec and compressionRegistry
Djjanks May 19, 2025
f932ebc
Avoid copying decompressed data to contiguous ArrayBuffer
Djjanks May 21, 2025
7bcd611
Refactor: Move vector decompressed data reader to CompressedVectorLoader
Djjanks May 22, 2025
b6c2753
Refactor _CompressionRegistry
Djjanks May 27, 2025
a05e7c0
Add codec encode validators
Djjanks May 27, 2025
7d2aa05
Implement BodyCompression encode/decode to metadata message
Djjanks May 27, 2025
7900afb
Implement compression to RecordBatch writer
Djjanks May 27, 2025
d1cabbf
Refactor padding calculation for compressed Record Batch
Djjanks May 27, 2025
76b1e30
Refactor RecordBatch writing to handle compression inline
Djjanks May 29, 2025
deb0b18
Add ZSTD validation
Djjanks May 29, 2025
01b2ab8
Fix buffer size calculation in _writeBodyBuffers
Djjanks May 29, 2025
d02694a
fix(ipc/writer): prevent compression with writeLegacyIpcFormat=true
Djjanks Jun 6, 2025
c82914f
test(ipc/writer): add compression test to stream writer
Djjanks Jun 6, 2025
ff01e9f
Merge branch 'main' into feature/arrow-compression
Djjanks Jun 6, 2025
33789f9
License header
Djjanks Jun 7, 2025
0487c44
feat(ipc/writer): add options to RecordBatchFileWriter constructor
Djjanks Jun 8, 2025
56845c2
fix(ipc/writer): handle dictionary batch correctly when compression i…
Djjanks Jun 8, 2025
0ca03f2
test(ipc/writer): add compression test to file writer
Djjanks Jun 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"@types/benchmark": "2.1.5",
"@types/glob": "8.1.0",
"@types/jest": "29.5.14",
"@types/lz4js": "0.2.1",
"@types/multistream": "4.1.3",
"async-done": "2.0.0",
"benny": "3.7.1",
Expand All @@ -96,6 +97,7 @@
"ix": "7.0.0",
"jest": "29.7.0",
"jest-silent-reporter": "0.6.0",
"lz4js": "0.2.0",
"memfs": "4.17.2",
"mkdirp": "3.0.1",
"multistream": "4.1.0",
Expand All @@ -114,7 +116,8 @@
"webpack": "5.99.9",
"webpack-bundle-analyzer": "4.10.2",
"webpack-stream": "7.0.0",
"xml2js": "0.6.2"
"xml2js": "0.6.2",
"zstd-codec": "0.1.5"
},
"engines": {
"node": ">=12.0"
Expand Down
1 change: 1 addition & 0 deletions src/Arrow.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export {
RecordBatch,
util,
Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable,
compressionRegistry, CompressionType,
} from './Arrow.js';

export {
Expand Down
2 changes: 2 additions & 0 deletions src/Arrow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

export { MessageHeader } from './fb/message-header.js';
export { CompressionType } from './fb/compression-type.js';

export {
Type,
Expand Down Expand Up @@ -92,6 +93,7 @@ export type { ReadableSource, WritableSink } from './io/stream.js';
export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from './ipc/reader.js';
export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, RecordBatchJSONWriter } from './ipc/writer.js';
export { tableToIPC, tableFromIPC } from './ipc/serialization.js';
export { compressionRegistry } from './ipc/compression/registry.js';
export { MessageReader, AsyncMessageReader, JSONMessageReader } from './ipc/message.js';
export { Message } from './ipc/metadata/message.js';
export { RecordBatch } from './recordbatch.js';
Expand Down
19 changes: 19 additions & 0 deletions src/ipc/compression/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

export const LENGTH_NO_COMPRESSED_DATA = -1;
export const COMPRESS_LENGTH_PREFIX = 8;
46 changes: 46 additions & 0 deletions src/ipc/compression/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import { CompressionType } from '../../fb/compression-type.js';
import { compressionValidators } from './validators.js';

export interface Codec {
encode?(data: Uint8Array): Uint8Array;
decode?(data: Uint8Array): Uint8Array;
}

class _CompressionRegistry {
protected declare registry: { [key in CompressionType]?: Codec };

constructor() {
this.registry = {};
}

set(compression: CompressionType, codec: Codec) {
if (codec?.encode && typeof codec.encode === 'function' && !compressionValidators[compression].isValidCodecEncode(codec)) {
throw new Error(`Encoder for ${CompressionType[compression]} is not valid.`);
}
this.registry[compression] = codec;
}

get(compression: CompressionType): Codec | null {
return this.registry?.[compression] || null;
}

}

export const compressionRegistry = new _CompressionRegistry();
92 changes: 92 additions & 0 deletions src/ipc/compression/validators.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import type { Codec } from './registry.ts';
import { CompressionType } from '../../fb/compression-type.js';

export interface CompressionValidator {
isValidCodecEncode(codec: Codec): boolean;
}

class Lz4FrameValidator implements CompressionValidator {
private readonly LZ4_FRAME_MAGIC = new Uint8Array([4, 34, 77, 24]);
private readonly MIN_HEADER_LENGTH = 7; // 4 (magic) + 2 (FLG + BD) + 1 (header checksum) = 7 min bytes

isValidCodecEncode(codec: Codec): boolean {
Comment on lines +25 to +29
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many libraries use the raw LZ4 format instead of the framed format, I decided to add validation for the encode function. This ensures that Arrow files compressed with LZ4 can be correctly read in other languages. Initially, I considered comparing compressed and decompressed buffers, but due to optional metadata flags, this might not be reliable. Instead, I validate that the encode function generates a correct metadata header. I'm unsure if similar validation is needed for decode since users should notice if their data decompresses incorrectly.

const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
const compressed = codec.encode!(testData);
return this._isValidCompressed(compressed);
}

private _isValidCompressed(buffer: Uint8Array): boolean {
return (
this._hasMinimumLength(buffer) &&
this._hasValidMagicNumber(buffer) &&
this._hasValidVersion(buffer)
);
}

private _hasMinimumLength(buffer: Uint8Array): boolean {
return buffer.length >= this.MIN_HEADER_LENGTH;
}

private _hasValidMagicNumber(buffer: Uint8Array): boolean {
return this.LZ4_FRAME_MAGIC.every(
(byte, i) => buffer[i] === byte
);
}

private _hasValidVersion(buffer: Uint8Array): boolean {
const flg = buffer[4];
const versionBits = (flg & 0xC0) >> 6;
return versionBits === 1;
}

}

class ZstdValidator implements CompressionValidator {
private readonly ZSTD_MAGIC = new Uint8Array([40, 181, 47, 253]);
private readonly MIN_HEADER_LENGTH = 6; // 4 (magic) + 2 (min Frame_Header) = 6 min bytes

isValidCodecEncode(codec: Codec): boolean {
const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
const compressed = codec.encode!(testData);
return this._isValidCompressed(compressed);
}

private _isValidCompressed(buffer: Uint8Array): boolean {
return (
this._hasMinimumLength(buffer) &&
this._hasValidMagicNumber(buffer)
);
}

private _hasMinimumLength(buffer: Uint8Array): boolean {
return buffer.length >= this.MIN_HEADER_LENGTH;
}

private _hasValidMagicNumber(buffer: Uint8Array): boolean {
return this.ZSTD_MAGIC.every(
(byte, i) => buffer[i] === byte
);
}
}

export const compressionValidators: Record<CompressionType, CompressionValidator> = {
[CompressionType.LZ4_FRAME]: new Lz4FrameValidator(),
[CompressionType.ZSTD]: new ZstdValidator(),
};
3 changes: 2 additions & 1 deletion src/ipc/metadata/json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export function recordBatchFromJSON(b: any) {
return new RecordBatch(
b['count'],
fieldNodesFromJSON(b['columns']),
buffersFromJSON(b['columns'])
buffersFromJSON(b['columns']),
null
);
}

Expand Down
68 changes: 63 additions & 5 deletions src/ipc/metadata/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import { FixedSizeBinary as _FixedSizeBinary } from '../../fb/fixed-size-binary.
import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js';
import { Map as _Map } from '../../fb/map.js';
import { Message as _Message } from '../../fb/message.js';
import { CompressionType as _CompressionType } from '../../fb/compression-type.js';
import { BodyCompression as _BodyCompression } from '../../fb/body-compression.js';
import { BodyCompressionMethod as _BodyCompressionMethod } from '../../fb/body-compression-method.js';

import { Schema, Field } from '../../schema.js';
import { toUint8Array } from '../../util/buffer.js';
Expand Down Expand Up @@ -122,9 +125,11 @@ export class Message<T extends MessageHeader = any> {
protected _headerType: T;
protected _bodyLength: number;
protected _version: MetadataVersion;
protected _compression: BodyCompression | null;
public get type() { return this.headerType; }
public get version() { return this._version; }
public get headerType() { return this._headerType; }
public get compression() { return this._compression; }
public get bodyLength() { return this._bodyLength; }
declare protected _createHeader: MessageHeaderDecoder;
public header() { return this._createHeader<T>(); }
Expand All @@ -136,6 +141,7 @@ export class Message<T extends MessageHeader = any> {
this._version = version;
this._headerType = headerType;
this.body = new Uint8Array(0);
this._compression = header?.compression;
header && (this._createHeader = () => header);
this._bodyLength = bigIntToNumber(bodyLength);
}
Expand All @@ -149,13 +155,21 @@ export class RecordBatch {
protected _length: number;
protected _nodes: FieldNode[];
protected _buffers: BufferRegion[];
protected _compression: BodyCompression | null;
public get nodes() { return this._nodes; }
public get length() { return this._length; }
public get buffers() { return this._buffers; }
constructor(length: bigint | number, nodes: FieldNode[], buffers: BufferRegion[]) {
public get compression() { return this._compression; }
constructor(
length: bigint | number,
nodes: FieldNode[],
buffers: BufferRegion[],
compression: BodyCompression | null
) {
this._nodes = nodes;
this._buffers = buffers;
this._length = bigIntToNumber(length);
this._compression = compression;
}
}

Expand Down Expand Up @@ -208,6 +222,19 @@ export class FieldNode {
}
}

/**
* @ignore
* @private
**/
export class BodyCompression {
public type: _CompressionType;
public method: _BodyCompressionMethod;
constructor(type: _CompressionType, method: _BodyCompressionMethod = _BodyCompressionMethod.BUFFER) {
this.type = type;
this.method = method;
}
}

/** @ignore */
function messageHeaderFromJSON(message: any, type: MessageHeader) {
return (() => {
Expand Down Expand Up @@ -254,6 +281,9 @@ FieldNode['decode'] = decodeFieldNode;
BufferRegion['encode'] = encodeBufferRegion;
BufferRegion['decode'] = decodeBufferRegion;

BodyCompression['encode'] = encodeBodyCompression;
BodyCompression['decode'] = decodeBodyCompression;

declare module '../../schema' {
namespace Field {
export { encodeField as encode };
Expand Down Expand Up @@ -286,6 +316,10 @@ declare module './message' {
export { encodeBufferRegion as encode };
export { decodeBufferRegion as decode };
}
namespace BodyCompression {
export { encodeBodyCompression as encode };
export { decodeBodyCompression as decode };
}
}

/** @ignore */
Expand All @@ -296,10 +330,13 @@ function decodeSchema(_schema: _Schema, dictionaries: Map<number, DataType> = ne

/** @ignore */
function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V5) {
if (batch.compression() !== null) {
throw new Error('Record batch compression not implemented');
}
return new RecordBatch(batch.length(), decodeFieldNodes(batch), decodeBuffers(batch, version));
const recordBatch = new RecordBatch(
batch.length(),
decodeFieldNodes(batch),
decodeBuffers(batch, version),
decodeBodyCompression(batch.compression())
);
return recordBatch;
}

/** @ignore */
Expand Down Expand Up @@ -491,6 +528,11 @@ function decodeFieldType(f: _Field, children?: Field[]): DataType<any> {
throw new Error(`Unrecognized type: "${Type[typeId]}" (${typeId})`);
}

/** @ignore */
function decodeBodyCompression(b: _BodyCompression | null) {
return b ? new BodyCompression(b.codec(), b.method()) : null;
}

/** @ignore */
function encodeSchema(b: Builder, schema: Schema) {

Expand Down Expand Up @@ -583,13 +625,29 @@ function encodeRecordBatch(b: Builder, recordBatch: RecordBatch) {

const buffersVectorOffset = b.endVector();

let bodyCompressionOffset = null;
if (recordBatch.compression !== null) {
bodyCompressionOffset = encodeBodyCompression(b, recordBatch.compression);
}

_RecordBatch.startRecordBatch(b);
_RecordBatch.addLength(b, BigInt(recordBatch.length));
_RecordBatch.addNodes(b, nodesVectorOffset);
_RecordBatch.addBuffers(b, buffersVectorOffset);
if (recordBatch.compression !== null && bodyCompressionOffset) {
_RecordBatch.addCompression(b, bodyCompressionOffset);
}
return _RecordBatch.endRecordBatch(b);
}

/** @ignore */
function encodeBodyCompression(b: Builder, node: BodyCompression) {
_BodyCompression.startBodyCompression(b);
_BodyCompression.addCodec(b, node.type);
_BodyCompression.addMethod(b, node.method);
return _BodyCompression.endBodyCompression(b);
}

/** @ignore */
function encodeDictionaryBatch(b: Builder, dictionaryBatch: DictionaryBatch) {
const dataOffset = RecordBatch.encode(b, dictionaryBatch.data);
Expand Down
Loading