-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
-Support for kafka-sink #2999
-Support for kafka-sink #2999
Conversation
properties.put(SASL_SECURITY_PROTOCOL, SASL_SSL_PROTOCOL); | ||
properties.put("ssl.endpoint.identification.algorithm", | ||
kafkaSinkConfig.getAuthConfig().getSslConfig().getSslEndpointAlgorithm()); | ||
if("USER_INFO".equalsIgnoreCase(kafkaSinkConfig.getAuthConfig().getSslConfig().getBasicAuthSource())){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not quite getting the purpose of USER_INFO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is dedicated when you want to connect ot confluent schema registry.
region: "ap-south-1" | ||
|
||
serde_format: plaintext | ||
topics: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the use case of having more than 1 topics in sink? are we going to write same message to multiple topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, its as per the HLD.
sasl_ssl: | ||
ssl_endpoint_identification_algorithm: https | ||
basic_auth_credentials_source: USER_INFO | ||
api_key: XXXXXXXXXXX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whats the difference between api_key/secret and username/passowrd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
api key and secret is for schema registry. client_key and client_secret can be compared to username and password.
schema: | ||
#registry_url: https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud | ||
version: 1 | ||
authentication: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is authentication block in source also. I think it should gel well with what we accept in source config. please check with krishna on authentication block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok we will make the changes.
ba3bcd1
to
07f00c2
Compare
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
updated changes. |
@JsonProperty("ssl_endpoint_identification_algorithm") | ||
private String sslEndpointAlgorithm; | ||
|
||
@JsonProperty("plain_config") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this? The plaintext config above has the exactly same config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , we have incorporated the changes.
@@ -0,0 +1,26 @@ | |||
package org.opensearch.dataprepper.plugins.kafka.configuration; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pretty much same as PlainTextConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , we have incorporated the changes.
} | ||
|
||
private void publishPlaintextMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) { | ||
producer.send(new ProducerRecord(topic.getName(), key, record.getData().toJsonString()), callBack(dataForDlq)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that you are not handling "tagsTargetKey". I think Umair/Omkar is familiar with this. You can also see OpenSearchSink code for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , we have incorporated the changes.
Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
} | ||
|
||
public int getRetries() { | ||
if (retries == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better to assign default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kafka already has default values.
if (record.getData().getEventHandle() != null) { | ||
bufferedEventHandles.add(record.getData().getEventHandle()); | ||
} | ||
TopicConfig topic = kafkaSinkConfig.getTopic(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a class lvel property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will incorporate this change in next PR.
Event event = getEvent(record); | ||
final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator); | ||
Object dataForDlq = event.toJsonString(); | ||
LOG.info("Producing record " + dataForDlq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has to be removed ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will incorporate this change in next PR.
Object dataForDlq = event.toJsonString(); | ||
LOG.info("Producing record " + dataForDlq); | ||
try { | ||
final String serdeFormat = kafkaSinkConfig.getSerdeFormat(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this also seem to be a good candidate for class level property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will incorporate this change in next PR.
private void publishJsonMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) throws IOException, RestClientException, ProcessingException { | ||
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class); | ||
if (validateJson(topic.getName(), dataForDlq)) { | ||
producer.send(new ProducerRecord(topic.getName(), key, dataNode), callBack(dataForDlq)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producer.send is repeated in all functions. functions can be refarctoed to getJsonMessage etc and producer.send at one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will incorporate this change in next PR.
} | ||
} | ||
|
||
private boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to happen for every produce record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes , this is implemented as per the requirement.
} | ||
try { | ||
final KafkaSinkProducer producer = createProducer(); | ||
records.forEach(record -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For each record do we need to create Producer worker.? Please check how Consumer thread is created,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is as per the requirement only.
Map propertiesMap = schemaProps; | ||
return new CachedSchemaRegistryClient( | ||
kafkaSinkConfig.getSchemaConfig().getRegistryURL(), | ||
100, propertiesMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is 100? can you please define a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will incorporate this change in next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left few comments. I am approving it, but some of the items in produce record code path require refactoring/optimizations.
Description
Kafka Sink with SASL_PLAINTEXT, OAuth, schema registry, multithreaded producer and DLQ.
Issues Resolved
Github issue #1986
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.