Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ final class Offset {
KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_SCRAM_512),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
description = "Kafka Source Authentication Mode",
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use `KafkaAuthenticationMethod.NONE` for no authentication, `KafkaAuthenticationMethod.SASL_PLAIN` for SASL/PLAIN username and password, "
+ "and `KafkaAuthenticationMethod.TLS` for certificate-based authentication. `KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS` "
+ "should be used only for Google Cloud Apache Kafka for BigQuery cluster, it allows to authenticate using application default credentials.")
+ "`KafkaAuthenticationMethod.SASL_SCRAM_512` for SASL_SCRAM_512 authentication and `KafkaAuthenticationMethod.TLS` for certificate-based "
+ "authentication. `KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS` should be used only for Google Cloud Apache Kafka for BigQuery cluster, "
+ "it allows to authenticate using application default credentials.")
@Default.String(KafkaAuthenticationMethod.SASL_PLAIN)
String getKafkaReadAuthenticationMode();

Expand Down Expand Up @@ -215,4 +217,63 @@ final class Offset {
String getKafkaReadKeyPasswordSecretId();

void setKafkaReadKeyPasswordSecretId(String sourceKeyPasswordSecretId);

@TemplateParameter.Text(
order = 13,
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
optional = true,
description = "Secret Version ID For Kafka SASL_SCRAM Username",
helpText =
"The Google Cloud Secret Manager secret ID that contains the Kafka username "
+ "to use with `SASL_SCRAM` authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramUsernameSecretId();

void setKafkaReadSaslScramUsernameSecretId(String value);

@TemplateParameter.Text(
order = 14,
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = KafkaAuthenticationMethod.SASL_SCRAM_512,
optional = true,
description = "Secret Version ID For Kafka SASL_SCRAM Password",
helpText =
"The Google Cloud Secret Manager secret ID that contains the Kafka password to use with `SASL_SCRAM` authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramPasswordSecretId();

void setKafkaReadSaslScramPasswordSecretId(String value);

@TemplateParameter.GcsReadFile(
order = 15,
optional = true,
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
description = "Truststore File Location",
helpText =
"The Google Cloud Storage path to the Java TrustStore (JKS) file that contains"
+ " the trusted certificates to use to verify the identity of the Kafka broker.")
String getKafkaReadSaslScramTruststoreLocation();

void setKafkaReadSaslScramTruststoreLocation(String sourceSaslScramTruststoreLocation);

@TemplateParameter.Text(
order = 16,
optional = true,
groupName = "Source",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
description = "Secret Version ID for Truststore Password",
helpText =
"The Google Cloud Secret Manager secret ID that contains the password to "
+ "use to access the Java TrustStore (JKS) file for Kafka SASL_SCRAM authentication",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramTruststorePasswordSecretId();

void setKafkaReadSaslScramTruststorePasswordSecretId(
String sourceSaslScramTruststorePasswordSecretId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ public interface KafkaWriteOptions extends PipelineOptions {
@TemplateParameter.TemplateEnumOption(
KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_PLAIN),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.SASL_SCRAM_512),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.TLS),
@TemplateParameter.TemplateEnumOption(KafkaAuthenticationMethod.NONE),
},
helpText =
"The mode of authentication to use with the Kafka cluster. "
+ "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, and"
+ "Use NONE for no authentication, SASL_PLAIN for SASL/PLAIN username and password, "
+ " SASL_SCRAM_512 for SASL_SCRAM_512 based authentication and"
+ " TLS for certificate-based authentication.")
@Default.String(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)
String getKafkaWriteAuthenticationMethod();
Expand Down Expand Up @@ -160,4 +162,63 @@ public interface KafkaWriteOptions extends PipelineOptions {
String getKafkaWriteKeyPasswordSecretId();

void setKafkaWriteKeyPasswordSecretId(String destinationKeyPasswordSecretId);

@TemplateParameter.Text(
order = 13,
groupName = "Destination",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
optional = true,
description = "Secret Version ID For Kafka SASL_SCRAM Username",
helpText =
"The Google Cloud Secret Manager secret ID that contains the Kafka username "
+ "to use with `SASL_SCRAM` authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramUsernameSecretId();

void setKafkaReadSaslScramUsernameSecretId(String value);

@TemplateParameter.Text(
order = 14,
groupName = "Destination",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = KafkaAuthenticationMethod.SASL_SCRAM_512,
optional = true,
description = "Secret Version ID For Kafka SASL_SCRAM Password",
helpText =
"The Google Cloud Secret Manager secret ID that contains the Kafka password to use with `SASL_SCRAM` authentication.",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramPasswordSecretId();

void setKafkaReadSaslScramPasswordSecretId(String value);

@TemplateParameter.GcsReadFile(
order = 15,
optional = true,
groupName = "Destination",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
description = "Truststore File Location",
helpText =
"The Google Cloud Storage path to the Java TrustStore (JKS) file that contains"
+ " the trusted certificates to use to verify the identity of the Kafka broker.")
String getKafkaReadSaslScramTruststoreLocation();

void setKafkaReadSaslScramTruststoreLocation(String sourceSaslScramTruststoreLocation);

@TemplateParameter.Text(
order = 16,
optional = true,
groupName = "Destination",
parentName = "kafkaReadAuthenticationMode",
parentTriggerValues = {KafkaAuthenticationMethod.SASL_SCRAM_512},
description = "Secret Version ID for Truststore Password",
helpText =
"The Google Cloud Secret Manager secret ID that contains the password to "
+ "use to access the Java TrustStore (JKS) file for Kafka SASL_SCRAM authentication",
example = "projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>")
String getKafkaReadSaslScramTruststorePasswordSecretId();

void setKafkaReadSaslScramTruststorePasswordSecretId(
String sourceSaslScramTruststorePasswordSecretId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
options.getKafkaReadKeystorePasswordSecretId(),
options.getKafkaReadKeyPasswordSecretId(),
options.getKafkaReadUsernameSecretId(),
options.getKafkaReadPasswordSecretId());
options.getKafkaReadPasswordSecretId(),
options.getKafkaReadSaslScramUsernameSecretId(),
options.getKafkaReadSaslScramPasswordSecretId(),
options.getKafkaReadSaslScramTruststoreLocation(),
options.getKafkaReadSaslScramTruststorePasswordSecretId());

Check warning on line 51 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L47-L51

Added lines #L47 - L51 were not covered by tests

properties.putAll(KafkaCommonUtils.configureKafkaOffsetCommit(options));

Expand All @@ -60,7 +64,11 @@
options.getKafkaWriteKeystorePasswordSecretId(),
options.getKafkaWriteKeyPasswordSecretId(),
options.getKafkaWriteUsernameSecretId(),
options.getKafkaWritePasswordSecretId());
options.getKafkaWritePasswordSecretId(),
options.getKafkaReadSaslScramUsernameSecretId(),
options.getKafkaReadSaslScramPasswordSecretId(),
options.getKafkaReadSaslScramTruststoreLocation(),
options.getKafkaReadSaslScramTruststorePasswordSecretId());

Check warning on line 71 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L67-L71

Added lines #L67 - L71 were not covered by tests
}

public static Map<String, Object> fromSchemaRegistryOptions(SchemaRegistryOptions options) {
Expand Down Expand Up @@ -122,7 +130,11 @@
String keystorePasswordSecretId,
String keyPasswordSecretId,
String usernameSecretId,
String passwordSecretId) {
String passwordSecretId,
String scramUsernameSecretId,
String scramPasswordSecretId,
String scramTruststoreLocation,
String scramTruststorePasswordSecretId) {
Map<String, Object> properties = new HashMap<>();
if (authMode == null || authMode.equals(KafkaAuthenticationMethod.NONE)) {
return properties;
Expand Down Expand Up @@ -154,6 +166,22 @@
+ " password=\'"
+ SecretManagerUtils.getSecret(passwordSecretId)
+ "\';");
} else if (authMode.equals(KafkaAuthenticationMethod.SASL_SCRAM_512)) {
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(

Check warning on line 172 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L170-L172

Added lines #L170 - L172 were not covered by tests
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.scram.ScramLoginModule required"
+ " username=\'"
+ SecretManagerUtils.getSecret(scramUsernameSecretId)

Check warning on line 176 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L176

Added line #L176 was not covered by tests
+ "\'"
+ " password=\'"
+ SecretManagerUtils.getSecret(scramPasswordSecretId)

Check warning on line 179 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L179

Added line #L179 was not covered by tests
+ "\';");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, scramTruststoreLocation);
properties.put(

Check warning on line 182 in v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaConfig.java#L181-L182

Added lines #L181 - L182 were not covered by tests
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
FileAwareFactoryFn.SECRET_MANAGER_VALUE_PREFIX + scramTruststorePasswordSecretId);
} else if (authMode.equals(KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaAuthenticationMethod {
public static final String NONE = "NONE";
public static final String TLS = "TLS";
public static final String SASL_PLAIN = "SASL_PLAIN";
public static final String SASL_SCRAM_512 = "SASL_SCRAM_512";
public static final String APPLICATION_DEFAULT_CREDENTIALS = "APPLICATION_DEFAULT_CREDENTIALS";
public static final String OAUTH = "OAUTH";
}
Loading