-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15057: Use new interface from zstd-jni #13814
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one!
@@ -269,7 +269,7 @@ public int partitionLeaderEpoch() { | |||
|
|||
public InputStream recordInputStream(BufferSupplier bufferSupplier) { | |||
final ByteBuffer buffer = this.buffer.duplicate(); | |||
buffer.position(RECORDS_OFFSET); | |||
buffer.position(buffer.position() + RECORDS_OFFSET); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related? Perhaps a comment on why this is changing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a relates change but something I found while lurking in the code. I would probably revert it from this pr so that we don't pollute this with unnecessary changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@divijvaidya Nice one! Out of curiosity, have you tried to run kafka-producer-perf-test.sh before/after the patch? |
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
87492c1
to
b53bb11
Compare
It would be good to get this merged. |
Background
In Kafka's code, every batch of records is stored in an in-memory byte buffer. For compressed workload, this buffer contains data in compressed form. Before writing it to the log, Kafka performs some validations such as ensuring that offsets are monotonically increasing etc. To perform this validation, Kafka needs to uncompress the data stored in byte buffer.
For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface provided by the downstream zstd-jni library to perform decompression.
ZstdInputStreamNoFinalizer takes input an InputStream and provides output an InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka wraps the ByteBuffer into an InputStream to satisfy the input contract for ZstdInputStreamNoFinalizer.
Problem
ZstdInputStreamNoFinalizer is not a good fit for our use case because we already have the entire compressed data stored in a buffer. We don't have a need for an interface which takes InputStream as an input. Our requirement is for an interface which takes a ByteBuffer as an input and provides a stream of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.
Usage of ZstdInputStreamNoFinalizer has the following problems:
Solution
I have extended an an interface in downstream library zstd-jni to suit the use case of Kafka. The new interface is called ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it takes input as a ByteBuffer containing compressed data and provides output as an InputStream. It solves the above problems as follows:
Result
Improvement in method throughput 10-20% as demonstrated by microbenchmark report at https://issues.apache.org/jira/secure/attachment/13058907/zstd-upgrade.xlsx . The microbenchmark uses existing benchmark at https://github.com/apache/kafka/blob/trunk/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/record/RecordBatchIterationBenchmark.java
Reducing in allocation of a buffer as demonstrated by unit test
References
Changes in downstream zstd-jni
Add new interface -
luben/zstd-jni@d65490e
Bug fixes in new interface -
luben/zstd-jni@8bf8066438785ce55b62fc7e6816faafe1e3b39e
luben/zstd-jni@100c434
luben/zstd-jni@355b8511a2967d097a619047a579930cac2ccd9d