Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Kafka security configuration #2994

Merged
merged 4 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/kafka-plugins/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation 'io.confluent:kafka-avro-serializer:7.3.3'
implementation 'io.confluent:kafka-schema-registry-client:7.3.3'
implementation 'io.confluent:kafka-schema-registry:7.3.3:tests'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.6'
testImplementation 'org.mockito:mockito-inline:4.1.0'
testImplementation 'org.yaml:snakeyaml:2.0'
testImplementation testLibs.spring.test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,73 @@

package org.opensearch.dataprepper.plugins.kafka.configuration;


import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.Valid;

import java.util.stream.Stream;

/**
* * A helper class that helps to read auth related configuration values from
* A helper class that helps to read auth related configuration values from
* pipelines.yaml
*/
public class AuthConfig {
@JsonProperty("sasl_plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("sasl_oauth")
private OAuthConfig oAuthConfig;
public static class SaslAuthConfig {
@JsonProperty("plaintext")
private PlainTextAuthConfig plainTextAuthConfig;

@JsonProperty("oauth")
private OAuthConfig oAuthConfig;

@JsonProperty("aws_iam")
private AwsIamAuthConfig awsIamAuthConfig;

public AwsIamAuthConfig getAwsIamAuthConfig() {
return awsIamAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
}

public OAuthConfig getOAuthConfig() {
return oAuthConfig;
}

@AssertTrue(message = "Only one of AwsIam or oAuth or PlainText auth config must be specified")
public boolean hasOnlyOneConfig() {
return Stream.of(awsIamAuthConfig, plainTextAuthConfig, oAuthConfig).filter(n -> n!=null).count() == 1;
}

public OAuthConfig getoAuthConfig() {
return oAuthConfig;
}

public void setoAuthConfig(OAuthConfig oAuthConfig) {
this.oAuthConfig = oAuthConfig;
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication

public SslAuthConfig() {
}
}

public void setPlainTextAuthConfig(PlainTextAuthConfig plainTextAuthConfig) {
this.plainTextAuthConfig = plainTextAuthConfig;
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

public PlainTextAuthConfig getPlainTextAuthConfig() {
return plainTextAuthConfig;
public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}

@AssertTrue(message = "Only one of SSL or SASL auth config must be specified")
public boolean hasSaslOrSslConfig() {
return Stream.of(sslAuthConfig, saslAuthConfig).filter(n -> n!=null).count() == 1;
Copy link
Contributor

@hshardeesi hshardeesi Jul 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have a similar check on sasl_mechanism as well? I guess only one of the mechanisms should be specified.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,50 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import jakarta.validation.Valid;

public class AwsConfig {
@JsonProperty("msk_arn")
@Size(min = 20, max = 2048, message = "mskArn length should be between 20 and 2048 characters")
private String awsMskArn;

public String getAwsMskArn() {
return awsMskArn;
public static class AwsMskConfig {
@Valid
@Size(min = 20, max = 2048, message = "msk_arn length should be between 20 and 2048 characters")
@JsonProperty("arn")
private String arn;

@JsonProperty("broker_connection_type")
private MskBrokerConnectionType brokerConnectionType;

public String getArn() {
return arn;
}

public MskBrokerConnectionType getBrokerConnectionType() {
return brokerConnectionType;
}
}

@JsonProperty("msk")
private AwsMskConfig awsMskConfig;

@Valid
@Size(min = 1, message = "Region cannot be empty string")
@JsonProperty("region")
private String region;

@Valid
@Size(min = 20, max = 2048, message = "sts_role_arn length should be between 20 and 2048 characters")
@JsonProperty("sts_role_arn")
private String stsRoleArn;

public AwsMskConfig getAwsMskConfig() {
return awsMskConfig;
}

public String getRegion() {
return region;
}

public String getStsRoleArn() {
return stsRoleArn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum AwsIamAuthConfig {
ROLE("role"),
DEFAULT("default");
//TODO add "PROFILE" option

private static final Map<String, AwsIamAuthConfig> OPTIONS_MAP = Arrays.stream(AwsIamAuthConfig.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private final String option;

AwsIamAuthConfig(final String option) {
this.option = option;
}

@JsonCreator
static AwsIamAuthConfig fromOptionValue(final String option) {
return OPTIONS_MAP.get(option.toLowerCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum EncryptionType {
SSL("ssl"),
PLAINTEXT("plaintext");

private static final Map<String, EncryptionType> OPTIONS_MAP = Arrays.stream(EncryptionType.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

EncryptionType(final String type) {
this.type = type;
}

@JsonCreator
static EncryptionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ public class KafkaSourceConfig {
@Valid
private SchemaConfig schemaConfig;

@Valid
@JsonProperty("authentication")
private AuthConfig authConfig;

@JsonProperty("encryption")
private EncryptionType encryptionType = EncryptionType.SSL;

@JsonProperty("aws")
@Valid
private AwsConfig awsConfig;

@JsonProperty("acknowledgments")
Expand Down Expand Up @@ -83,6 +88,14 @@ public AuthConfig getAuthConfig() {
return authConfig;
}

public EncryptionType getEncryptionType() {
return encryptionType;
}

public AwsConfig getAwsConfig() {
return awsConfig;
}

public void setAuthConfig(AuthConfig authConfig) {
this.authConfig = authConfig;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.kafka.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Map;
import java.util.Arrays;
import java.util.stream.Collectors;

public enum MskBrokerConnectionType {
PUBLIC("public"),
SINGLE_VPC("single_vpc"),
MULTI_VPC("multi_vpc");

private static final Map<String, MskBrokerConnectionType> OPTIONS_MAP = Arrays.stream(MskBrokerConnectionType.values())
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

private final String type;

MskBrokerConnectionType(final String type) {
this.type = type;
}

@JsonCreator
static MskBrokerConnectionType fromTypeValue(final String type) {
return OPTIONS_MAP.get(type.toLowerCase());
}
}
Loading
Loading