-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
-Support for kafka-sink #2950
-Support for kafka-sink #2950
Conversation
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…ani3021@gmail.com> Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…haramdasani3021@gmail.com>" This reverts commit fff5398.
* feat: add include_key options to KeyValueProcessor Signed-off-by: Haidong <whaidong@amazon.com> --------- Signed-off-by: Haidong <whaidong@amazon.com> Co-authored-by: Haidong <whaidong@amazon.com> Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…-project#2976) * Added Kafka config to support acknowledgments and MSK arn Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Modified to use data-prepper-core in testImplementation Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> * Addressed failing test Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Co-authored-by: Krishna Kondaka <krishkdk@dev-dsk-krishkdk-2c-bd29c437.us-west-2.amazon.com> Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
4a4f1d6
to
657e89c
Compare
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
import jakarta.validation.constraints.NotNull; | ||
import jakarta.validation.constraints.Size; | ||
|
||
public class AwsDLQConfig { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge this with AwsConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is no longer available. Please review the updated PR #2999.
String serdeFormat=kafkaSinkConfig.getSerdeFormat(); | ||
if (MessageFormat.JSON.toString().equalsIgnoreCase(serdeFormat)) { | ||
JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); | ||
dataForDlq = dataNode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be set in the exception path? We don't need this in the normal path, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please elaborate.
Schema schema =new Schema.Parser().parse(valueToParse); | ||
GenericRecord genericRecord = getGenericRecord(record.getData(),schema); | ||
dataForDlq = genericRecord; | ||
producer.send(new ProducerRecord(topic.getName(), genericRecord)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also you should release event handle once the event is successfully sent to Kafka or to the DLQ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review the updated PR #2999.
void test_aws_props(){ | ||
assertThat(awsDLQConfig.getBucket(), notNullValue()); | ||
assertThat(awsDLQConfig.getRegion(), notNullValue()); | ||
assertThat(awsDLQConfig.getRoleArn(), notNullValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assertions should be more than not null. For example, You can have something like String testArn
which can be used in the setup and then can be compared in the assertion here. Take a look at the unit test cases added recently in the KafkaSource, like KafkaSourceTest.java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is no longer available. Please review the updated PR #2999.
@BeforeEach | ||
void setUp() throws IOException { | ||
//Added to load Yaml file - Start | ||
Yaml yaml = new Yaml(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer this to be not read from a YAML file. You can set fields being tested and mock everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been addressed. Please review the updated PR #2999.
producer = new KafkaSinkProducer(mockProducer, kafkaSinkConfig,dlqSink); | ||
sinkProducer = spy(producer); | ||
sinkProducer.produceRecords(record); | ||
verify(sinkProducer).produceRecords(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not enough. We need to assert that the records are put in Kafka properly by mocking the KafkaProducer. See KafkaSourceCustomerConsumerTest.java on how this can be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been addressed. Please review the updated PR #2999.
multithreadedProducer = createObjectUnderTest(); | ||
Thread spySink = spy(new Thread(multithreadedProducer)); | ||
spySink.start(); | ||
verify(spySink).start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be able to assert that it has functioned as expected in addition to verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file does not have any scope for assertions but produce method has some proper assertions in KafkaSinkProducerTest. Please review the updated PR #2999 .
…pper into kafka-sink-draft Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…k-draft # Conflicts: # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDLQConfig.java # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfig.java # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSink.java # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkaSink.java # data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/SinkPropertyConfigurer.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/AwsDlqConfigTest.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaSinkConfigTest.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/KafkaSinkProducerTest.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/producer/ProducerWorkerTest.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/DLQSinkTest.java # data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/sink/KafkasinkTest.java
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
…de/data-prepper into HEAD Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
we are closing this PR as we have raised the new PR #2999. |
Signed-off-by: rajeshLovesToCode rajesh.dharamdasani3021@gmail.com
Description
Kafk Sink with SASL_PLAINTEXT, OAuth, schema registry, multithreaded producer and DLQ.
Issues Resolved
Github issue #1986
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.