Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 #3843

Merged
merged 4 commits into from
Mar 20, 2019

Conversation

MarvinCai
Copy link
Contributor

Motivatio
Add a wrapper around Kafka's org.apache.kafka.clients.producer.ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090

The wrapper will try to delegate all call to underlying instance of Kafka's org.apache.kafka.clients.producer.ProducerInterceptor it holds.

When PulsarKafkaProducer convert a Kafka's ProducerRecord to Pulsar's Message, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.
When doing the delegation, we'll do
PulsarMessage -> Kafka's ProducerRecord -> invoke underlying Kafka's org.apache.kafka.clients.producer.ProducerInterceptor#onSend -> PulsarMessage
It'll try to preserve all the information. Verified through unit test.
For org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement it'll call org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.

@sijie sijie added component/kafka type/feature The PR added a new feature or issue requested a new feature labels Mar 17, 2019
@sijie sijie added this to the 2.4.0 milestone Mar 17, 2019
/**
* A wrapper for Kafka's {@link org.apache.kafka.clients.producer.ProducerInterceptor} to make pulsar support
* Kafka ProducerInterceptor. It holds an instance of {@link org.apache.kafka.clients.producer.ProducerInterceptor}
* ans it'll delegate all invocation to that instance.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo ans


interceptors = (List) producerConfig.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
//interceptorsClasses.forEach(interceptorClazz -> interceptors.add(createKafkaProducerInterceptor(interceptorClazz)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, thanks for the work. left some nit comments

@MarvinCai
Copy link
Contributor Author

run integration tests
run java8 tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

run integration tests
run java8 tests

@MarvinCai
Copy link
Contributor Author

run integration tests

@MarvinCai
Copy link
Contributor Author

run java8 tests

@MarvinCai
Copy link
Contributor Author

run integration tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

run integration tests

@MarvinCai
Copy link
Contributor Author

run integration tests
run java8 tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

run integration tests
run java8 tests

@MarvinCai
Copy link
Contributor Author

run integration tests

@MarvinCai
Copy link
Contributor Author

run java8 tests

@MarvinCai
Copy link
Contributor Author

run integration tests

1 similar comment
@MarvinCai
Copy link
Contributor Author

run integration tests

@MarvinCai
Copy link
Contributor Author

run java8 tests

@sijie sijie requested a review from codelipenghui March 19, 2019 11:32
@sijie
Copy link
Member

sijie commented Mar 19, 2019

@codelipenghui can you review this PR also? since it is related to interceptors.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@sijie sijie merged commit e77a165 into apache:master Mar 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants