-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Implement IPC RecordBatch body buffer compression #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
src/ipc/reader.ts
Outdated
const combined = new Uint8Array(totalSize); | ||
|
||
for (const [i, decompressedBuffer] of decompressedBuffers.entries()) { | ||
combined.set(decompressedBuffer, newBufferRegions[i].offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to implement this without copying the inflated data back into a single contiguous ArrayBuffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible to implement a VirtualUint8Array
class that takes an array of Uint8Array
chunks and implements the necessary methods to behave like a contiguous Uint8Array
. I'm going to experiment with that approach soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that might be more complicated than necessary.
IIUC, the new logic loops through all buffers, decompresses them, and collects them into a list. Then it packs all the decompressed buffers into a contiguous ArrayBuffer that matches the equivalent IPC format without compression.
In order to avoid the last step of re-packing into an ArrayBuffer, we'd need to return the list of uncompressed buffers and use a VectorLoader
instance that accepts the list and selects the buffers by index (vs. the current behavior which accepts the contiguous ArrayBuffer and slices from it). Luckily, that's exactly what the JSONVectorLoader
does!
I don't think you can use the JSONVectorLoader
directly, since it assumes the list of buffers are JSON-encoded representations of the values, but you could implement a new CompressedVectorLoader
class that closely follows its structure but doesn't call methods like packBools()
and binaryDataFromJSON()
.
The logic in your function here would need to also return a list of BufferRegion
instances whose offset
field corresponds to the Array index of each decompressed buffer (rather than the byteOffset
of each buffer in the contiguous ArrayBuffer).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
export class CompressedVectorLoader extends VectorLoader {
private sources: any[][];
constructor(sources: Uint8Array[][], nodes: FieldNode[], buffers: BufferRegion[], dictionaries: Map<number, Vector<any>>, metadataVersion: MetadataVersion) {
super(new Uint8Array(0), nodes, buffers, dictionaries, metadataVersion);
this.sources = sources;
}
protected readNullBitmap<T extends DataType>(_type: T, nullCount: number, { offset } = this.nextBufferRange()) {
return nullCount <= 0 ? new Uint8Array(0) : this.sources[offset];
}
protected readOffsets<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
protected readTypeIds<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
protected readData<T extends DataType>(_type: T, { offset } = this.nextBufferRange()) {
return this.sources[offset];
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up solving this issue without implementing a VirtualUint8Array
. Instead, I modified the body parameter signature in _loadVectors
and the VectorLoader
constructor to accept Uint8Array | Uint8Array[]
.
It worked out nicely because the class already has a buffersIndex parameter that points to the correct buffer, and in my case, the decompression order matches the BufferRegion[] sequence. This approach required minimal changes, and thanks to the type signature, TypeScript will prevent errors in future modifications to VectorLoader.
Your suggested approach (with CompressedVectorLoader
) is also interesting—it would help isolate the logic for compressed buffers. If you think it’s the better solution, I can refactor the code to use it instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you help me figure out how to properly test this functionality
Do you intend to add compression support to the writer? Typically that's how we'd test this sort of behavior, since the reader and writer are duals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem—I’ll refactor this to use the CompressedVectorLoader
class instead of overloading the type signatures.
As for testing, I do plan to add compression support to the writer, though likely not for another month (depending on my project’s needs). Initially, I assumed the tests should be entirely independent, but I agree that aligning them with the writer’s behavior makes more sense and will be more maintainable long-term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can take a look at adding it to the writer. I wouldn't want to merge this PR without at least a limited set of tests, and verifying we can read what we write is the easiest way to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll try to find time this weekend to implement compression support in the writer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @trxcllnt!
I've implemented compression support for the reader and performed some minor refactoring to improve the structure. Here are the key changes:
- Added compression support for the writer (debugged and tested)
- Successfully verified LZ4 writer locally - it works correctly
- Small refactoring to streamline the code
- Introduced codec validators to prevent potential library mismatch issues
The main motivation for validators came from realizing that the current CompressionRegistry
approach might cause problems for users when trying to match compression/decompression libraries across different environments.
Could you please review my changes, especially the validation logic? Maybe you can suggest something about ZSTD validation?
c739044
to
d1cabbf
Compare
class Lz4FrameValidator implements CompressionValidator { | ||
private readonly LZ4_FRAME_MAGIC = new Uint8Array([4, 34, 77, 24]); | ||
private readonly MIN_HEADER_LENGTH = 7; // 4 (magic) + 2 (FLG + BD) + 1 (header checksum) = 7 min bytes | ||
|
||
isValidCodecEncode(codec: Codec): boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since many libraries use the raw LZ4 format instead of the framed format, I decided to add validation for the encode
function. This ensures that Arrow files compressed with LZ4 can be correctly read in other languages. Initially, I considered comparing compressed and decompressed buffers, but due to optional metadata flags, this might not be reliable. Instead, I validate that the encode
function generates a correct metadata header. I'm unsure if similar validation is needed for decode
since users should notice if their data decompresses incorrectly.
src/ipc/compression/validators.ts
Outdated
class ZstdValidator implements CompressionValidator { | ||
// private readonly ZSTD_MAGIC = new Uint8Array([40, 181, 47, 253]); | ||
isValidCodecEncode(_: Codec): boolean { | ||
console.warn('ZSTD encode validator is not implemented yet.'); | ||
return true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ZSTD, I need to research how its metadata is structured and whether different formats exist (similar to LZ4's raw vs. framed formats). This will help determine if additional validation is necessary.
@trxcllnt Before finalizing the PR, I'd like to add proper tests. Could you advise:
If there are specific test patterns or files I should follow as reference? |
@Djjanks Since compression mode is an option on the reader and writer, the easiest way to integrate is probably to add/update the tests in The It looks like the RecordBatchFileWriter tests need to be updated to accept writer options, and both will need to also pass the compression option to the reader, but that should be straightforward. |
4e7c09c
to
090c28a
Compare
090c28a
to
0ca03f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trxcllnt Thanks for the testing guidance. I've implemented stream-writer and file-writer tests successfully. But face some problems:
- dictionary batch compression logic
- import zstd libs
Can you give your advice on the above points?
if ((padding = ((size + 7) & ~7) - size) > 0) { | ||
this._writePadding(padding); | ||
} | ||
protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType: "record" | "dictionary" = "record") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to Arrow format documentation, only record batches are compressed, not dictionary batches. Is this correct? I add attribute batchType to aviod bufGroupSize compression logic. Have I understood this correctly?
import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry'; | ||
import * as lz4js from 'lz4js'; | ||
|
||
export async function registerCompressionCodecs(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented ZSTD compression with async initialization since most popular libraries require WASM/Node.js. I used dynamic import for ZSTD to avoid bundling issues. Duplicated the registration logic from stream writer because separating it into a shared module caused Jest import errors. May by somebody knows more elegant way to import zstd?
Rationale for this change
This change introduces support for reading compressed Arrow IPC streams in JavaScript. The primary motivation is the need to read Arrow IPC Stream in the browser when they are transmitted over the network in a compressed format to reduce network load.
Several reasons support this enhancement:
What changes are included in this PR?
Additional notes:
Not all JavaScript LZ4 libraries are compatible with the Arrow IPC format. For example:
This can result in silent or cryptic errors. To improve developer experience, we could:
After decompressing the buffers, new BufferRegion entries are calculated to match the uncompressed data layout. A new metadata.RecordBatch is constructed with the updated buffer regions and passed into _loadVectors().
This introduces a mutation-like pattern that may break assumptions in the current design. However, it's necessary because:
When reconstructing the metadata, the compression field is explicitly set to null, since the data is already decompressed in memory.
This decision is somewhat debatable — feedback is welcome on whether it's better to retain the original compression metadata or to reflect the current state of the buffer (uncompressed). The current implementation assumes the latter.
Are these changes tested?
Are there any user-facing changes?
Yes, Arrow JS users can now read compressed IPC stream, assuming they register an appropriate codec using compressionRegistry.set().
Example:
This change does not affect writing or serialization.
This PR includes breaking changes to public APIs.
No. The change adds functionality but does not modify any existing API behavior.
This PR contains a "Critical Fix".
No. This is a new feature, not a critical fix.
Checklist
yarn test
)yarn build
)Closes #109.