Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KAFKA-8830] KIP-512: make Record Headers available in onAcknowledgement #17099

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

rich-c-shop
Copy link

@rich-c-shop rich-c-shop commented Sep 5, 2024

close https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125310116
https://issues.apache.org/jira/browse/KAFKA-8830

Two sets of tests are added:

  1. KafkaProducerTest
    • when send success, both record.headers() and onAcknowledgement headers are read only
    • when send failure, record.headers() is writable as before and onAcknowledgement headers is read only
  2. ProducerInterceptorsTest
    • make both old and new onAcknowledgement method are called successfully

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Collaborator

@OmniaGM OmniaGM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I left few comments.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.

Could you implement two different interceptor classes in ProducerInterceptorsTest, one with the new signature of onAcknowledgement and one with the old, and then run the same parameterized tests to show they are equivalent in behaviour?

@rich-c-shop
Copy link
Author

Could you implement two different interceptor classes in ProducerInterceptorsTest, one with the new signature of onAcknowledgement and one with the old, and then run the same parameterized tests to show they are equivalent in behaviour?

sure. I added 2nd interceptor at 5c1c843. I didn't use parameterized tests because the test already include two interceptors, I just change later one to new signature

Future<RecordMetadata> future = producer.send(record, (recordMetadata, exception) -> { });
try {
assertInstanceOf(TimeoutException.class, assertThrows(ExecutionException.class, future::get).getCause());
//ensure headers are writable if send failure
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is header of producer record, different from onAcknowledgement's headers.

Producer record headers is writable because client might want to mutate it before retry. This is same behaviour as today, no change introduced from this PR.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from a trivial formatting problem, looks good to me.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. lgtm

@rich-c-shop
Copy link
Author

continuous-integration/jenkins/pr-merge — This commit has test failures

Thanks for reviewing. looks like the test failed in un-related tests. Anything I can help to proceed for the PR to merge?

@mjsax mjsax added the kip Requires or implements a KIP label Sep 28, 2024
Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, thanks for the PR! Took a first pass and left some comments for consideration.

Comment on lines 107 to 116
* @param metadata The metadata for the record that was sent. It includes the partition and offset of the record.
* If an error occurred, the metadata will contain only the topic and possibly the partition.

* If the partition was not assigned yet due to an error,
* it will be set to {@link org.apache.kafka.clients.producer.RecordMetadata#UNKNOWN_PARTITION}.
* before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during the processing of this record, or null if no error occurred.
* @param headers The headers for the record that was sent. This parameter may be null.
Copy link
Member

@lianetm lianetm Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format seems off here (lines not aligned)

Copy link
Author

@rich-c-shop rich-c-shop Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, is it aligned now?

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rich-c-shop thanks for this contribution!

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rich-c-shop thanks for this patch. one small comment is left.

* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent. This parameter may be null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ProducerRecord#header is never null, it would be more appropriate to pass an empty header rather than null. This aligns with the existing behavior and avoids introducing unnecessary null checks. WDYT?

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java#L83

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion makes sense to me. Thoughts @rich-c-shop ?

Copy link
Author

@rich-c-shop rich-c-shop Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree ProducerRecord#headers is never null, however, ProducerRecord could be null

It is possible to change
from

headers = record != null ? record.headers() : null;

to

headers = record != null ? record.headers() : new RecordHeaders(null);

below.

headers = record != null ? record.headers() : null;

Headers headers = record != null ? record.headers() : null;


Prefer new RecordHeaders(null)❓ To me, I think new RecordHeaders(null) is a bit leaking internal implementation outside ProducerRecord

Copy link
Member

@lianetm lianetm Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but what about

headers = record != null ? record.headers() : new RecordHeaders();

Just to pass empty headers obj instead of null to the callback, to make it's implementation simpler (avoid null checks/NPE). WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works: c927363

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! Left some minor comments, and there is this one from @chia7712 that hasn't been addressed. Please take a look when you can, this one seems almost ready to me. Thanks!

* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent. This parameter may be null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion makes sense to me. Thoughts @rich-c-shop ?

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! Just one follow-up comment, and also could you please merge trunk latest changes?

* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent. This parameter may be null.
Copy link
Member

@lianetm lianetm Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but what about

headers = record != null ? record.headers() : new RecordHeaders();

Just to pass empty headers obj instead of null to the callback, to make it's implementation simpler (avoid null checks/NPE). WDYT?

simplify if condition

fix test to send record and trigger onAcknowledgement

add 2nd ProducerInterceptor for test

Apply suggestions from code review

Co-authored-by: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>

./gradlew :clients:spotlessApply

new testHeadersFailure test to cover onAcknowledgement headers mutability

reduce PR diff

fix javadoc format

address comments with minor doc change and headers isReadOnly checking

set MAX_BLOCK_MS_CONFIG to timout fast in test

add comment about why copy headers

address comments

shortern max block of test and fix unchecked

use empty header
@rich-c-shop rich-c-shop force-pushed the KIP-512-add-headers-to-onAcknowledgement branch from c927363 to 02298d5 Compare January 3, 2025 16:46
@rich-c-shop
Copy link
Author

squash and rebased

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @rich-c-shop , LGMT.
@chia7712 and @AndrewJSchofield let me know if you have any other comments, otherwise I will carry on and merge.

Failed test EagerConsumerCoordinatorTest.testOutdatedCoordinatorAssignment seems unrelated to me, and it's already tracked as failing in trunk with https://issues.apache.org/jira/browse/KAFKA-15900.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rich-c-shop thanks for this PR and sorry for late reviews. two minor comments are left. PTAL

@@ -1546,6 +1547,7 @@ private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interc
// whole lifetime of the batch.
// We don't want to have an NPE here, because the interceptors would not be notified (see .doSend).
topic = record != null ? record.topic() : null;
headers = record != null ? record.headers() : new RecordHeaders();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To maintain header immutability, especially given your efforts in this area, it might be prudent to pass immutable headers when record is null.

* The metadata may be null if the client passed null record to
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
* @param exception The exception thrown during processing of this record. Null if no error occurred.
* @param headers The headers for the record that was sent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remind users that they should not change the headers?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients kip Requires or implements a KIP producer
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants