Skip to content

Conversation

albertlockett
Copy link
Contributor

@albertlockett albertlockett commented Sep 22, 2025

Which issue does this PR close?

Rationale for this change

Reusing the zstd context between subsequent calls to compress_zstd in the Arrow IPC writer for performance improvement.

Benchmark results:

arrow_ipc_stream_writer/StreamWriter/write_10/zstd
                        time:   [4.0972 ms 4.1038 ms 4.1110 ms]
                        change: [-53.848% -53.586% -53.335%] (p = 0.00 < 0.05)
                        Performance has improved.

What changes are included in this PR?

Adds a CompressionContext struct, which when the zstd feature is enabled contains a zstd::bulk::Compressor object. This context object is owned by the ipc StreamWriter/FileWriter objects and is passed by mutable reference through the IpcDataGenerator to the CompressionCodec where it is used when compressing the ipc bytes.

Are these changes tested?

Yes the existing unit tests cover the changed code paths

Are there any user-facing changes?

Yes, the method IpcDataGenerator::encoded_batch now takes &mut CompressionContext as an argument.

@github-actions github-actions bot added arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate labels Sep 22, 2025
@mbrobbel mbrobbel added this to the 57.0.0 milestone Sep 22, 2025
@mbrobbel mbrobbel added api-change Changes to the arrow API next-major-release the PR has API changes and it waiting on the next major version labels Sep 22, 2025
@alamb
Copy link
Contributor

alamb commented Sep 22, 2025

🤖 ./gh_compare_arrow.sh Benchmark Script Running
Linux aal-dev 6.14.0-1014-gcp #15~24.04.1-Ubuntu SMP Fri Jul 25 23:26:08 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing albert/8386 (e4b33d1) to de84ff5 diff
BENCH_NAME=ipc_writer
BENCH_COMMAND=cargo bench --features=arrow,async,test_common,experimental --bench ipc_writer
BENCH_FILTER=
BENCH_BRANCH_NAME=albert_8386
Results will be posted here when complete

@alamb
Copy link
Contributor

alamb commented Sep 22, 2025

🤖: Benchmark completed

Details

group                                                 albert_8386                            main
-----                                                 -----------                            ----
arrow_ipc_stream_writer/FileWriter/write_10           1.00   440.2±17.87µs        ? ?/sec    1.01   446.0±17.88µs        ? ?/sec
arrow_ipc_stream_writer/StreamWriter/write_10         1.02   413.0±22.05µs        ? ?/sec    1.00   406.2±17.86µs        ? ?/sec
arrow_ipc_stream_writer/StreamWriter/write_10/zstd    1.00      8.4±0.05ms        ? ?/sec    1.37     11.6±0.11ms        ? ?/sec

@alamb
Copy link
Contributor

alamb commented Sep 22, 2025

arrow_ipc_stream_writer/StreamWriter/write_10/zstd    1.00      8.4±0.05ms        ? ?/sec    1.37     11.6±0.11ms        

That is a very nice result 🚀

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @albertlockett and @mbrobbel

This code looks good to me (nice results!) but I wonder if we can avoid the breaking change)? I left some possible ideas

}

// the reason we allow derivable_impls here is because when zstd feature is not enabled, this
// becomes derivable. however with zstd feature want to be explicit about the compression level.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for this context

/// let data_gen = IpcDataGenerator::default();
/// let (encoded_dictionaries, encoded_message) = data_gen
/// .encoded_batch(&batch, &mut dictionary_tracker, &options)
/// .encoded_batch(&batch, &mut dictionary_tracker, &options, &mut compression_context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an API change, right?

Is there some reason we can't add a CompressionContext as a field to IpcDataGenerator to avoid the API change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, that's tricky because the context needs to be &mut (because the zstd::bulk::Compressor it contains also needs to be a & mut to call compress), so if the context was a member of IpcDataGenerator, the encode_batch's receiver would also need to be an &mut self, which is again a breaking change.

Would it be too hacky if we added a encode_batch_with_context method, and then changed the implementation of encode_batch to just call that new method with &mut Default::default()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that would be nicer for downstream users

I think we should also deprecate the old one as described here https://github.com/apache/arrow-rs?tab=readme-ov-file#deprecation-guidelines (to help guide downstream users how to change their code)

Perhaps we could cal it encode instead of encode_batch_with_context to keep it shorter 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb made that change

/// compression.
pub struct CompressionContext {
#[cfg(feature = "zstd")]
compressor: zstd::bulk::Compressor<'static>,
Copy link
Contributor

@Dandandan Dandandan Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always contains zstd::bulk::Compressor even when using lz4 compression?

Copy link
Contributor

@Dandandan Dandandan Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do the same for lz4_flex::frame::FrameEncoder, does it help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always contains zstd::bulk::Compressor even when using lz4 compression?

If using lz4 compression, I imagine the the zstd feature wouldn't be enabled and this would just be an empty struct, right?

Can we do the same for lz4_flex::frame::FrameEncoder, does it help?

I imagine that it probably would help, although I didn't investigate as my use case was only focussed on zstd. My motivation behind adding this CompressionContext was that eventually it would be a good place to put something like this for lz4. Maybe we could do this in a followup issue/PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM -- thank you @albertlockett and @mbrobbel

/// compression.
pub struct CompressionContext {
#[cfg(feature = "zstd")]
compressor: zstd::bulk::Compressor<'static>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// Encodes a batch to a number of [EncodedData] items (dictionary batches + the record batch).
/// The [DictionaryTracker] keeps track of dictionaries with new `dict_id`s (so they are only sent once)
/// Make sure the [DictionaryTracker] is initialized at the start of the stream.
#[deprecated(since = "57.0.0", note = "Use `encode` instead")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 0b10ad8 into apache:main Sep 24, 2025
26 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api-change Changes to the arrow API arrow Changes to the arrow crate arrow-flight Changes to the arrow-flight crate next-major-release the PR has API changes and it waiting on the next major version
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reuse zstd context in arrow IPC writer
4 participants