Skip to content

feat: Implement IPC RecordBatch body buffer compression #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from

Conversation

Djjanks
Copy link

@Djjanks Djjanks commented May 19, 2025

Rationale for this change

This change introduces support for reading compressed Arrow IPC streams in JavaScript. The primary motivation is the need to read Arrow IPC Stream in the browser when they are transmitted over the network in a compressed format to reduce network load.

Several reasons support this enhancement:

  • Personal need in other project to read compressed arrow IPC stream.
  • Community demand, as seen in Issue apache/arrow-js#109.
  • A similar implementation was attempted in PR apache/arrow#13076 but was never merged. I am very grateful to @kylebarron .
  • Other language implementations (e.g., C++, Python, Rust) already support IPC compression.

What changes are included in this PR?

  • Support for decoding compressed RecordBatch buffers during reading.
  • Each buffer is decompressed individually, offsets are recalculated with 8-byte alignment, and a new metadata. RecordBatch is constructed before loading vectors.
  • Only decompression is implemented; compression (writing) is not supported yet.
  • Currently tested with the lz4 codec using the lz4js library. lz4-wasm was evaluated but rejected due to incompatibility with LZ4 Frame format.
  • The decompression logic is isolated to _loadRecordBatch() in the RecordBatchReaderImpl class.
  • A codec.decode function is retrieved from the compressionRegistry and applied per-buffer. So users can choose suitable lib.

Additional notes:

  1. Codec compatibility caveats
    Not all JavaScript LZ4 libraries are compatible with the Arrow IPC format. For example:
  • lz4js works correctly as it supports the LZ4 Frame Format.
  • lz4-wasm is not compatible, as it expects raw LZ4 blocks and fails to decompress LZ4 frame data.
    This can result in silent or cryptic errors. To improve developer experience, we could:
  • Wrap codec.decode calls in try/catch and surface a clearer error message if decompression fails.
  • Add an optional check in compressionRegistry.set() to validate that the codec supports LZ4 Frame Format. One way would be to compress dummy data and inspect the first 4 bytes for the expected LZ4 Frame magic header (0x04 0x22 0x4D 0x18).
  1. Reconstruction of metadata.RecordBatch
    After decompressing the buffers, new BufferRegion entries are calculated to match the uncompressed data layout. A new metadata.RecordBatch is constructed with the updated buffer regions and passed into _loadVectors().
    This introduces a mutation-like pattern that may break assumptions in the current design. However, it's necessary because:
  • _loadVectors() depends strictly on the offsets in header.buffers, which no longer match the decompressed buffer layout.
  • Without changing either _loadVectors() or metadata.RecordBatch, the current approach is the least intrusive.
  1. Setting compression = null in new RecordBatch
    When reconstructing the metadata, the compression field is explicitly set to null, since the data is already decompressed in memory.
    This decision is somewhat debatable — feedback is welcome on whether it's better to retain the original compression metadata or to reflect the current state of the buffer (uncompressed). The current implementation assumes the latter.

Are these changes tested?

  • The changes were tested in the own project using LZ4-compressed Arrow stream.
  • Test uncompressed, compressed and pseudo compressed(uncompressed data length = -1) data.
  • No unit tests are included in this PR yet.
  • The decompression was verified with real-world data and the lz4js codec (lz4-wasm is not compatible).
  • No issues were observed with alignment, vector loading, or decompression integrity.
  • Exception handling is not yet added around codec.decode. This may be useful for catching codec incompatibility and providing better user feedback.

Are there any user-facing changes?

Yes, Arrow JS users can now read compressed IPC stream, assuming they register an appropriate codec using compressionRegistry.set().

Example:

import { Codec, compressionRegistry } from 'apache-arrow';
import * as lz4 from 'lz4js';

  const lz4Codec: Codec = {
      encode(data: Uint8Array): Uint8Array { return lz4js.compress(data) },
      decode(data: Uint8Array): Uint8Array { return lz4js.decompress(data) }
  }; 

  compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);

This change does not affect writing or serialization.

This PR includes breaking changes to public APIs.
No. The change adds functionality but does not modify any existing API behavior.

This PR contains a "Critical Fix".
No. This is a new feature, not a critical fix.

Checklist

  • All tests pass (yarn test)
  • Build completes (yarn build)
  • I have added a new test for compressed batches

Closes #109.

@Djjanks Djjanks closed this May 19, 2025
@Djjanks Djjanks reopened this May 19, 2025
@Djjanks Djjanks closed this May 19, 2025
@trxcllnt trxcllnt reopened this May 20, 2025
@trxcllnt trxcllnt self-requested a review May 20, 2025 19:38
const combined = new Uint8Array(totalSize);

for (const [i, decompressedBuffer] of decompressedBuffers.entries()) {
combined.set(decompressedBuffer, newBufferRegions[i].offset);
Copy link
Contributor

@trxcllnt trxcllnt May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to implement this without copying the inflated data back into a single contiguous ArrayBuffer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible to implement a VirtualUint8Array class that takes an array of Uint8Array chunks and implements the necessary methods to behave like a contiguous Uint8Array. I'm going to experiment with that approach soon.

Copy link
Contributor

@trxcllnt trxcllnt May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be more complicated than necessary.

IIUC, the new logic loops through all buffers, decompresses them, and collects them into a list. Then it packs all the decompressed buffers into a contiguous ArrayBuffer that matches the equivalent IPC format without compression.

In order to avoid the last step of re-packing into an ArrayBuffer, we'd need to return the list of uncompressed buffers and use a VectorLoader instance that accepts the list and selects the buffers by index (vs. the current behavior which accepts the contiguous ArrayBuffer and slices from it). Luckily, that's exactly what the JSONVectorLoader does!

I don't think you can use the JSONVectorLoader directly, since it assumes the list of buffers are JSON-encoded representations of the values, but you could implement a new CompressedVectorLoader class that closely follows its structure but doesn't call methods like packBools() and binaryDataFromJSON().

The logic in your function here would need to also return a list of BufferRegion instances whose offset field corresponds to the Array index of each decompressed buffer (rather than the byteOffset of each buffer in the contiguous ArrayBuffer).

Copy link
Contributor

@trxcllnt trxcllnt May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this:

export class CompressedVectorLoader extends VectorLoader {
    private sources: any[][];
    constructor(sources: Uint8Array[][], nodes: FieldNode[], buffers: BufferRegion[], dictionaries: Map<number, Vector<any>>, metadataVersion: MetadataVersion) {
        super(new Uint8Array(0), nodes, buffers, dictionaries, metadataVersion);
        this.sources = sources;
    }
    protected readNullBitmap<T extends DataType>(_type: T, nullCount: number, { offset } = this.nextBufferRange()) {
        return nullCount <= 0 ? new Uint8Array(0) : this.sources[offset];
    }
    protected readOffsets<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
        return this.sources[offset];
    }
    protected readTypeIds<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
        return this.sources[offset];
    }
    protected readData<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
        return this.sources[offset];
    }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up solving this issue without implementing a VirtualUint8Array. Instead, I modified the body parameter signature in _loadVectors and the VectorLoader constructor to accept Uint8Array | Uint8Array[].

It worked out nicely because the class already has a buffersIndex parameter that points to the correct buffer, and in my case, the decompression order matches the BufferRegion[] sequence. This approach required minimal changes, and thanks to the type signature, TypeScript will prevent errors in future modifications to VectorLoader.

Your suggested approach (with CompressedVectorLoader) is also interesting—it would help isolate the logic for compressed buffers. If you think it’s the better solution, I can refactor the code to use it instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you help me figure out how to properly test this functionality

Do you intend to add compression support to the writer? Typically that's how we'd test this sort of behavior, since the reader and writer are duals.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem—I’ll refactor this to use the CompressedVectorLoader class instead of overloading the type signatures.

As for testing, I do plan to add compression support to the writer, though likely not for another month (depending on my project’s needs). Initially, I assumed the tests should be entirely independent, but I agree that aligning them with the writer’s behavior makes more sense and will be more maintainable long-term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take a look at adding it to the writer. I wouldn't want to merge this PR without at least a limited set of tests, and verifying we can read what we write is the easiest way to do that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'll try to find time this weekend to implement compression support in the writer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @trxcllnt!

I've implemented compression support for the reader and performed some minor refactoring to improve the structure. Here are the key changes:

  • Added compression support for the writer (debugged and tested)
  • Successfully verified LZ4 writer locally - it works correctly
  • Small refactoring to streamline the code
  • Introduced codec validators to prevent potential library mismatch issues

The main motivation for validators came from realizing that the current CompressionRegistry approach might cause problems for users when trying to match compression/decompression libraries across different environments.

Could you please review my changes, especially the validation logic? Maybe you can suggest something about ZSTD validation?

@Djjanks Djjanks force-pushed the feature/arrow-compression branch from c739044 to d1cabbf Compare May 27, 2025 22:11
Comment on lines +25 to +29
class Lz4FrameValidator implements CompressionValidator {
private readonly LZ4_FRAME_MAGIC = new Uint8Array([4, 34, 77, 24]);
private readonly MIN_HEADER_LENGTH = 7; // 4 (magic) + 2 (FLG + BD) + 1 (header checksum) = 7 min bytes

isValidCodecEncode(codec: Codec): boolean {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since many libraries use the raw LZ4 format instead of the framed format, I decided to add validation for the encode function. This ensures that Arrow files compressed with LZ4 can be correctly read in other languages. Initially, I considered comparing compressed and decompressed buffers, but due to optional metadata flags, this might not be reliable. Instead, I validate that the encode function generates a correct metadata header. I'm unsure if similar validation is needed for decode since users should notice if their data decompresses incorrectly.

Comment on lines 61 to 67
class ZstdValidator implements CompressionValidator {
// private readonly ZSTD_MAGIC = new Uint8Array([40, 181, 47, 253]);
isValidCodecEncode(_: Codec): boolean {
console.warn('ZSTD encode validator is not implemented yet.');
return true;
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ZSTD, I need to research how its metadata is structured and whether different formats exist (similar to LZ4's raw vs. framed formats). This will help determine if additional validation is necessary.

@kou kou changed the title GH-24833 Implement IPC RecordBatch body buffer compression feat: Implement IPC RecordBatch body buffer compression May 29, 2025
@Djjanks
Copy link
Author

Djjanks commented May 31, 2025

@trxcllnt Before finalizing the PR, I'd like to add proper tests. Could you advise:

  1. Should I create new dedicated test files for compression/decompression functionality, or
  2. Integrate tests into existing test files?

If there are specific test patterns or files I should follow as reference?

@trxcllnt
Copy link
Contributor

trxcllnt commented Jun 2, 2025

@Djjanks Since compression mode is an option on the reader and writer, the easiest way to integrate is probably to add/update the tests in stream-writer-tests.ts and file-writer-tests.ts.

The testStreamWriter and testFileWriter helper functions can accept the writer options and verify the input table round-trips through the writer -> reader successfully.

It looks like the RecordBatchFileWriter tests need to be updated to accept writer options, and both will need to also pass the compression option to the reader, but that should be straightforward.

@Djjanks Djjanks force-pushed the feature/arrow-compression branch from 4e7c09c to 090c28a Compare June 8, 2025 13:25
@Djjanks Djjanks force-pushed the feature/arrow-compression branch from 090c28a to 0ca03f2 Compare June 8, 2025 13:52
Copy link
Author

@Djjanks Djjanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trxcllnt Thanks for the testing guidance. I've implemented stream-writer and file-writer tests successfully. But face some problems:

  1. dictionary batch compression logic
  2. import zstd libs

Can you give your advice on the above points?

if ((padding = ((size + 7) & ~7) - size) > 0) {
this._writePadding(padding);
}
protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType: "record" | "dictionary" = "record") {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Arrow format documentation, only record batches are compressed, not dictionary batches. Is this correct? I add attribute batchType to aviod bufGroupSize compression logic. Have I understood this correctly?

import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry';
import * as lz4js from 'lz4js';

export async function registerCompressionCodecs(): Promise<void> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented ZSTD compression with async initialization since most popular libraries require WASM/Node.js. I used dynamic import for ZSTD to avoid bundling issues. Duplicated the registration logic from stream writer because separating it into a shared module caused Jest import errors. May by somebody knows more elegant way to import zstd?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[JS] Implement IPC RecordBatch body buffer compression from ARROW-300
2 participants