-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 #3843
Conversation
…roducerConfig.INTERCEPTOR_CLASSES_CONFIG.
/** | ||
* A wrapper for Kafka's {@link org.apache.kafka.clients.producer.ProducerInterceptor} to make pulsar support | ||
* Kafka ProducerInterceptor. It holds an instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor} | ||
* ans it'll delegate all invocation to that instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo ans
|
||
interceptors = (List) producerConfig.getConfiguredInstances( | ||
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); | ||
//interceptorsClasses.forEach(interceptorClazz -> interceptors.add(createKafkaProducerInterceptor(interceptorClazz))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, thanks for the work. left some nit comments
run integration tests |
1 similar comment
run integration tests |
run integration tests |
run java8 tests |
run integration tests |
1 similar comment
run integration tests |
run integration tests |
1 similar comment
run integration tests |
run integration tests |
run java8 tests |
run integration tests |
1 similar comment
run integration tests |
run java8 tests |
@codelipenghui can you review this PR also? since it is related to interceptors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Motivatio
Add a wrapper around Kafka's
org.apache.kafka.clients.producer.ProducerInterceptor
to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090The wrapper will try to delegate all call to underlying instance of Kafka's
org.apache.kafka.clients.producer.ProducerInterceptor
it holds.When
PulsarKafkaProducer
convert a Kafka'sProducerRecord
to Pulsar'sMessage
, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.When doing the delegation, we'll do
Pulsar
Message
-> Kafka'sProducerRecord
-> invoke underlying Kafka'sorg.apache.kafka.clients.producer.ProducerInterceptor#onSend
-> PulsarMessage
It'll try to preserve all the information. Verified through unit test.
For
org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement
it'll callorg.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement
only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.