-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KAFKA-8830] KIP-512: make Record Headers available in onAcknowledgement #17099
base: trunk
Are you sure you want to change the base?
[KAFKA-8830] KIP-512: make Record Headers available in onAcknowledgement #17099
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I left few comments.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
Could you implement two different interceptor classes in ProducerInterceptorsTest
, one with the new signature of onAcknowledgement
and one with the old, and then run the same parameterized tests to show they are equivalent in behaviour?
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Show resolved
Hide resolved
sure. I added 2nd interceptor at 5c1c843. I didn't use parameterized tests because the test already include two interceptors, I just change later one to new signature |
Future<RecordMetadata> future = producer.send(record, (recordMetadata, exception) -> { }); | ||
try { | ||
assertInstanceOf(TimeoutException.class, assertThrows(ExecutionException.class, future::get).getCause()); | ||
//ensure headers are writable if send failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is header of producer record, different from onAcknowledgement's headers.
Producer record headers is writable because client might want to mutate it before retry. This is same behaviour as today, no change introduced from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from a trivial formatting problem, looks good to me.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. lgtm
Thanks for reviewing. looks like the test failed in un-related tests. Anything I can help to proceed for the PR to merge? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, thanks for the PR! Took a first pass and left some comments for consideration.
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
* @param metadata The metadata for the record that was sent. It includes the partition and offset of the record. | ||
* If an error occurred, the metadata will contain only the topic and possibly the partition. | ||
|
||
* If the partition was not assigned yet due to an error, | ||
* it will be set to {@link org.apache.kafka.clients.producer.RecordMetadata#UNKNOWN_PARTITION}. | ||
* before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION. | ||
* The metadata may be null if the client passed null record to | ||
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}. | ||
* @param exception The exception thrown during the processing of this record, or null if no error occurred. | ||
* @param headers The headers for the record that was sent. This parameter may be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format seems off here (lines not aligned)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, is it aligned now?
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rich-c-shop thanks for this contribution!
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rich-c-shop thanks for this patch. one small comment is left.
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Outdated
Show resolved
Hide resolved
* The metadata may be null if the client passed null record to | ||
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}. | ||
* @param exception The exception thrown during processing of this record. Null if no error occurred. | ||
* @param headers The headers for the record that was sent. This parameter may be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since ProducerRecord#header
is never null, it would be more appropriate to pass an empty header rather than null. This aligns with the existing behavior and avoids introducing unnecessary null checks. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This suggestion makes sense to me. Thoughts @rich-c-shop ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree ProducerRecord#headers
is never null, however, ProducerRecord
could be null
It is possible to change
from
headers = record != null ? record.headers() : null;
to
headers = record != null ? record.headers() : new RecordHeaders(null);
below.
kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
Line 1564 in 235e53f
headers = record != null ? record.headers() : null; |
kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Line 113 in 235e53f
Headers headers = record != null ? record.headers() : null; |
Prefer new RecordHeaders(null)
❓ To me, I think new RecordHeaders(null)
is a bit leaking internal implementation outside ProducerRecord
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but what about
headers = record != null ? record.headers() : new RecordHeaders();
Just to pass empty headers obj instead of null to the callback, to make it's implementation simpler (avoid null checks/NPE). WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that works: c927363
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The metadata may be null if the client passed null record to | ||
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}. | ||
* @param exception The exception thrown during processing of this record. Null if no error occurred. | ||
* @param headers The headers for the record that was sent. This parameter may be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This suggestion makes sense to me. Thoughts @rich-c-shop ?
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! Just one follow-up comment, and also could you please merge trunk latest changes?
* The metadata may be null if the client passed null record to | ||
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}. | ||
* @param exception The exception thrown during processing of this record. Null if no error occurred. | ||
* @param headers The headers for the record that was sent. This parameter may be null. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, but what about
headers = record != null ? record.headers() : new RecordHeaders();
Just to pass empty headers obj instead of null to the callback, to make it's implementation simpler (avoid null checks/NPE). WDYT?
simplify if condition fix test to send record and trigger onAcknowledgement add 2nd ProducerInterceptor for test Apply suggestions from code review Co-authored-by: Omnia Ibrahim <o.g.h.ibrahim@gmail.com> ./gradlew :clients:spotlessApply new testHeadersFailure test to cover onAcknowledgement headers mutability reduce PR diff fix javadoc format address comments with minor doc change and headers isReadOnly checking set MAX_BLOCK_MS_CONFIG to timout fast in test add comment about why copy headers address comments shortern max block of test and fix unchecked use empty header
c927363
to
02298d5
Compare
squash and rebased |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @rich-c-shop , LGMT.
@chia7712 and @AndrewJSchofield let me know if you have any other comments, otherwise I will carry on and merge.
Failed test EagerConsumerCoordinatorTest.testOutdatedCoordinatorAssignment
seems unrelated to me, and it's already tracked as failing in trunk with https://issues.apache.org/jira/browse/KAFKA-15900.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rich-c-shop thanks for this PR and sorry for late reviews. two minor comments are left. PTAL
@@ -1546,6 +1547,7 @@ private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interc | |||
// whole lifetime of the batch. | |||
// We don't want to have an NPE here, because the interceptors would not be notified (see .doSend). | |||
topic = record != null ? record.topic() : null; | |||
headers = record != null ? record.headers() : new RecordHeaders(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To maintain header immutability, especially given your efforts in this area, it might be prudent to pass immutable headers when record is null.
* The metadata may be null if the client passed null record to | ||
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}. | ||
* @param exception The exception thrown during processing of this record. Null if no error occurred. | ||
* @param headers The headers for the record that was sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remind users that they should not change the headers?
close https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125310116
https://issues.apache.org/jira/browse/KAFKA-8830
Two sets of tests are added:
Committer Checklist (excluded from commit message)