Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

-Support for kafka-sink #2950

Conversation

rajeshLovesToCode
Copy link
Contributor

Signed-off-by: rajeshLovesToCode rajesh.dharamdasani3021@gmail.com

Description

Kafk Sink with SASL_PLAINTEXT, OAuth, schema registry, multithreaded producer and DLQ.

Issues Resolved

Github issue #1986

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

rajeshLovesToCode and others added 9 commits June 28, 2023 21:50
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…ani3021@gmail.com>

Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…haramdasani3021@gmail.com>"

This reverts commit fff5398.
* feat: add include_key options to KeyValueProcessor

Signed-off-by: Haidong <whaidong@amazon.com>

---------

Signed-off-by: Haidong <whaidong@amazon.com>
Co-authored-by: Haidong <whaidong@amazon.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…-project#2976)

* Added Kafka config to support acknowledgments and MSK arn

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Modified to use data-prepper-core in testImplementation

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed review comments

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

* Addressed failing test

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>

---------

Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;

public class AwsDLQConfig {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge this with AwsConfig.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is no longer available. Please review the updated PR #2999.

String serdeFormat=kafkaSinkConfig.getSerdeFormat();
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) {
JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
dataForDlq = dataNode;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be set in the exception path? We don't need this in the normal path, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please elaborate.

Schema schema =new Schema.Parser().parse(valueToParse);
GenericRecord genericRecord = getGenericRecord(record.getData(),schema);
dataForDlq = genericRecord;
producer.send(new ProducerRecord(topic.getName(), genericRecord));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you should release event handle once the event is successfully sent to Kafka or to the DLQ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the updated PR #2999.

void test_aws_props(){
assertThat(awsDLQConfig.getBucket(), notNullValue());
assertThat(awsDLQConfig.getRegion(), notNullValue());
assertThat(awsDLQConfig.getRoleArn(), notNullValue());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertions should be more than not null. For example, You can have something like String testArn which can be used in the setup and then can be compared in the assertion here. Take a look at the unit test cases added recently in the KafkaSource, like KafkaSourceTest.java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is no longer available. Please review the updated PR #2999.

@BeforeEach
void setUp() throws IOException {
//Added to load Yaml file - Start
Yaml yaml = new Yaml();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer this to be not read from a YAML file. You can set fields being tested and mock everything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed. Please review the updated PR #2999.

producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig,dlqSink);
sinkProducer = spy(producer);
sinkProducer.produceRecords(record);
verify(sinkProducer).produceRecords(record);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not enough. We need to assert that the records are put in Kafka properly by mocking the KafkaProducer. See KafkaSourceCustomerConsumerTest.java on how this can be done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed. Please review the updated PR #2999.

multithreadedProducer = createObjectUnderTest();
Thread spySink = spy(new Thread(multithreadedProducer));
spySink.start();
verify(spySink).start();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to assert that it has functioned as expected in addition to verify.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file does not have any scope for assertions but produce method has some proper assertions in KafkaSinkProducerTest. Please review the updated PR #2999 .

rajeshLovesToCode and others added 3 commits July 7, 2023 13:12
…pper into kafka-sink-draft

Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
rajeshLovesToCode and others added 6 commits July 7, 2023 22:11
…k-draft

# Conflicts:
#	data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java
#	data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java
#	data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java
#	data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java
#	data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java
#	data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…de/data-prepper into HEAD

Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
@rajeshLovesToCode
Copy link
Contributor Author

we are closing this PR as we have raised the new PR #2999.

@rajeshLovesToCode rajeshLovesToCode deleted the kafka-sink-draft branch September 21, 2023 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants