Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

-Support for kafka-sink #2999

Merged
merged 10 commits into from
Aug 7, 2023
Merged

Conversation

rajeshLovesToCode
Copy link
Contributor

Description

Kafka Sink with SASL_PLAINTEXT, OAuth, schema registry, multithreaded producer and DLQ.

Issues Resolved

Github issue #1986

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

properties.put(SASL_SECURITY_PROTOCOL, SASL_SSL_PROTOCOL);
properties.put("ssl.endpoint.identification.algorithm",
kafkaSinkConfig.getAuthConfig().getSslConfig().getSslEndpointAlgorithm());
if("USER_INFO".equalsIgnoreCase(kafkaSinkConfig.getAuthConfig().getSslConfig().getBasicAuthSource())){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite getting the purpose of USER_INFO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dedicated when you want to connect ot confluent schema registry.

region: "ap-south-1"

serde_format: plaintext
topics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the use case of having more than 1 topics in sink? are we going to write same message to multiple topics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, its as per the HLD.

sasl_ssl:
ssl_endpoint_identification_algorithm: https
basic_auth_credentials_source: USER_INFO
api_key: XXXXXXXXXXX
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the difference between api_key/secret and username/passowrd?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api key and secret is for schema registry. client_key and client_secret can be compared to username and password.

schema:
#registry_url: https://psrc-znpo0.ap-southeast-2.aws.confluent.cloud
version: 1
authentication:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is authentication block in source also. I think it should gel well with what we accept in source config. please check with krishna on authentication block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok we will make the changes.

Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
@rajeshLovesToCode
Copy link
Contributor Author

updated changes.

kkondaka
kkondaka previously approved these changes Jul 28, 2023
@JsonProperty("ssl_endpoint_identification_algorithm")
private String sslEndpointAlgorithm;

@JsonProperty("plain_config")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? The plaintext config above has the exactly same config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we have incorporated the changes.

@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.plugins.kafka.configuration;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty much same as PlainTextConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we have incorporated the changes.

}

private void publishPlaintextMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) {
producer.send(new ProducerRecord(topic.getName(), key, record.getData().toJsonString()), callBack(dataForDlq));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that you are not handling "tagsTargetKey". I think Umair/Omkar is familiar with this. You can also see OpenSearchSink code for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , we have incorporated the changes.

Signed-off-by: rajeshLovesToCode <rajesh.dharamdasani3021@gmail.com>
}

public int getRetries() {
if (retries == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to assign default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka already has default values.

if (record.getData().getEventHandle() != null) {
bufferedEventHandles.add(record.getData().getEventHandle());
}
TopicConfig topic = kafkaSinkConfig.getTopic();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a class lvel property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will incorporate this change in next PR.

Event event = getEvent(record);
final String key = event.formatString(kafkaSinkConfig.getPartitionKey(), expressionEvaluator);
Object dataForDlq = event.toJsonString();
LOG.info("Producing record " + dataForDlq);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has to be removed ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will incorporate this change in next PR.

Object dataForDlq = event.toJsonString();
LOG.info("Producing record " + dataForDlq);
try {
final String serdeFormat = kafkaSinkConfig.getSerdeFormat();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also seem to be a good candidate for class level property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will incorporate this change in next PR.

private void publishJsonMessage(Record<Event> record, TopicConfig topic, String key, Object dataForDlq) throws IOException, RestClientException, ProcessingException {
final JsonNode dataNode = new ObjectMapper().convertValue(record.getData().toJsonString(), JsonNode.class);
if (validateJson(topic.getName(), dataForDlq)) {
producer.send(new ProducerRecord(topic.getName(), key, dataNode), callBack(dataForDlq));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer.send is repeated in all functions. functions can be refarctoed to getJsonMessage etc and producer.send at one place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will incorporate this change in next PR.

}
}

private boolean validateSchema(final String jsonData, final String schemaJson) throws IOException, ProcessingException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to happen for every produce record?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes , this is implemented as per the requirement.

}
try {
final KafkaSinkProducer producer = createProducer();
records.forEach(record -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each record do we need to create Producer worker.? Please check how Consumer thread is created,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is as per the requirement only.

Map propertiesMap = schemaProps;
return new CachedSchemaRegistryClient(
kafkaSinkConfig.getSchemaConfig().getRegistryURL(),
100, propertiesMap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is 100? can you please define a constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will incorporate this change in next PR.

Copy link
Contributor

@hshardeesi hshardeesi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few comments. I am approving it, but some of the items in produce record code path require refactoring/optimizations.

@kkondaka kkondaka merged commit 0582788 into opensearch-project:main Aug 7, 2023
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants