Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15057: Use new interface from zstd-jni #13814

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from

Conversation

divijvaidya
Copy link
Contributor

@divijvaidya divijvaidya commented Jun 5, 2023

Background

In Kafka's code, every batch of records is stored in an in-memory byte buffer. For compressed workload, this buffer contains data in compressed form. Before writing it to the log, Kafka performs some validations such as ensuring that offsets are monotonically increasing etc. To perform this validation, Kafka needs to uncompress the data stored in byte buffer.

For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface provided by the downstream zstd-jni library to perform decompression. 

ZstdInputStreamNoFinalizer takes input an InputStream and provides output an InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps the ByteBuffer into an InputStream to satisfy the input contract for ZstdInputStreamNoFinalizer.

Problem

ZstdInputStreamNoFinalizer is not a good fit for our use case because we already have the entire compressed data stored in a buffer. We don't have a need for an interface which takes InputStream as an input. Our requirement is for an interface which takes a ByteBuffer as an input and provides a stream of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.

Usage of ZstdInputStreamNoFinalizer has the following problems:

  1. When decompression of batch is complete, we try to read another byte to check if the actual batch size if equal to declared batch size. This is done at RecordIterator#next(). This extra call to read another byte leads to a JNI call in existing interface.
  2. Since this interface requires input as an InputStream, we take the ByteBuffer containing compressed batch and convert it into a InputStream. This interface internally uses an intermediate buffer to read data from this InputStream in chunks. The chunk size is determined by underlying zstd library and hence, we will allocate a new buffer with very batch. This leads to the following transformation: ByteBuffer (compressed batch) -> InputStream (compressed batch) -> data copy to intermediate ByteBuffer (chunk of compressed batch) -> send chunk to zstd library for decompression -> refill the intermediate buffer by copying the data to intermediate ByteBuffer (next chunk of compressed batch)

Solution

I have extended an an interface in downstream library zstd-jni to suit the use case of Kafka. The new interface is called ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it takes input as a ByteBuffer containing compressed data and provides output as an InputStream. It solves the above problems as follows:

  1. When we read the final decompressed frame, this interface sets a flag to mark that all uncompressed data has been consumed. When RecordIterator#next() tries to determine if the stream has ended, we simply read the flag and hence, do not have to make a JNI call.
  2. It does not require any buffer allocation for input. It takes the input buffer and passes it across the JNI boundary without any intermediate copying. Hence, we don't perform any buffer allocation.

Result

  1. Improvement in method throughput 10-20% as demonstrated by microbenchmark report at https://issues.apache.org/jira/secure/attachment/13058907/zstd-upgrade.xlsx . The microbenchmark uses existing benchmark at https://github.com/apache/kafka/blob/trunk/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java

  2. Reducing in allocation of a buffer as demonstrated by unit test

References

Changes in downstream zstd-jni

Add new interface - 
luben/zstd-jni@d65490e

Bug fixes in new interface - 
luben/zstd-jni@8bf8066438785ce55b62fc7e6816faafe1e3b39e 
luben/zstd-jni@100c434
luben/zstd-jni@355b8511a2967d097a619047a579930cac2ccd9d 

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

@@ -269,7 +269,7 @@ public int partitionLeaderEpoch() {

public InputStream recordInputStream(BufferSupplier bufferSupplier) {
final ByteBuffer buffer = this.buffer.duplicate();
buffer.position(RECORDS_OFFSET);
buffer.position(buffer.position() + RECORDS_OFFSET);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change related? Perhaps a comment on why this is changing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a relates change but something I found while lurking in the code. I would probably revert it from this pr so that we don't pollute this with unnecessary changes.

Copy link
Member

@soarez soarez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@divijvaidya divijvaidya requested review from showuon and dajac June 9, 2023 11:32
@dajac
Copy link
Contributor

dajac commented Jun 10, 2023

@divijvaidya Nice one! Out of curiosity, have you tried to run kafka-producer-perf-test.sh before/after the patch?

@github-actions
Copy link

github-actions bot commented Sep 9, 2023

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot removed the stale Stale PRs label Nov 20, 2023
@mimaison
Copy link
Member

It would be good to get this merged.
@divijvaidya Can you shared the code for the micro benchmark you mention? Do you see an impact when running the kafka-producer-perf-test/kafka-consumer-perf-test tools?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants