Skip to content

Integrate OAuth 0.17.0 grantType configuration #11757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
"readTimeoutSeconds", "httpRetries", "httpRetryPauseMs", "clientSecret", "passwordSecret", "accessToken",
"refreshToken", "tlsTrustedCertificates", "disableTlsHostnameVerification", "maxTokenExpirySeconds",
"accessTokenIsJwt", "enableMetrics", "includeAcceptHeader", "accessTokenLocation",
"clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions"})
"clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions", "grantType"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaClientAuthenticationOAuth extends KafkaClientAuthentication {
public static final String TYPE_OAUTH = "oauth";

private String clientId;
private String grantType;
private String username;
private String scope;
private String audience;
Expand Down Expand Up @@ -80,6 +81,16 @@ public void setClientId(String clientId) {
this.clientId = clientId;
}

@Description("OAuth grant type to use when authenticating against the authorization server. This value defaults to `client_credentials` when `clientId` and `clientSecret` are specified.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getGrantType() {
return grantType;
}

public void setGrantType(String grantType) {
this.grantType = grantType;
}

@Description("OAuth scope to use when authenticating against the authorization server. Some authorization servers require this to be set. "
+ "The possible values depend on how authorization server is configured. By default `scope` is not specified when doing the token endpoint request.")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"accessTokenIsJwt", "tlsTrustedCertificates", "disableTlsHostnameVerification", "enableECDSA",
"maxSecondsWithoutReauthentication", "enablePlain", "tokenEndpointUri", "enableOauthBearer", "customClaimCheck",
"connectTimeoutSeconds", "readTimeoutSeconds", "httpRetries", "httpRetryPauseMs", "clientScope", "clientAudience",
"enableMetrics", "failFast", "includeAcceptHeader", "serverBearerTokenLocation", "userNamePrefix"})
"clientGrantType", "enableMetrics", "failFast", "includeAcceptHeader", "serverBearerTokenLocation", "userNamePrefix"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaListenerAuthenticationOAuth extends KafkaListenerAuthentication {
Expand Down Expand Up @@ -81,6 +81,7 @@ public class KafkaListenerAuthenticationOAuth extends KafkaListenerAuthenticatio
private Integer httpRetryPauseMs;
private String clientScope = null;
private String clientAudience = null;
private String clientGrantType = null;
private boolean enableMetrics = false;
private boolean failFast = true;
private boolean includeAcceptHeader = true;
Expand Down Expand Up @@ -217,6 +218,16 @@ public void setClientAudience(String audience) {
this.clientAudience = audience;
}

@Description("The grant type to use when making requests to the authorization server's token endpoint. Used for inter-broker authentication and for configuring OAuth 2.0 over PLAIN using the `clientId` and `secret` method.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getClientGrantType() {
return clientGrantType;
}

public void setClientGrantType(String grantType) {
this.clientGrantType = grantType;
}

@Description("URI of the JWKS certificate endpoint, which can be used for local JWT validation.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getJwksEndpointUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ public static Map<String, String> oauthJaasOptions(KafkaClientAuthenticationOAut
addOption(options, ClientConfig.OAUTH_CLIENT_ID, oauth.getClientId());
addOption(options, ClientConfig.OAUTH_PASSWORD_GRANT_USERNAME, oauth.getUsername());
addOption(options, ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, oauth.getTokenEndpointUri());
addOption(options, ClientConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, oauth.getGrantType());
addOption(options, ClientConfig.OAUTH_SCOPE, oauth.getScope());
addOption(options, ClientConfig.OAUTH_AUDIENCE, oauth.getAudience());
if (oauth.isDisableTlsHostnameVerification()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ private String getSecurityProtocol(boolean tls, boolean sasl) {
addBooleanOptionIfFalse(options, ServerConfig.OAUTH_CHECK_ISSUER, oauth.isCheckIssuer());
addBooleanOptionIfTrue(options, ServerConfig.OAUTH_CHECK_AUDIENCE, oauth.isCheckAudience());
addOptionIfNotNull(options, ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, oauth.getCustomClaimCheck());
addOptionIfNotNull(options, ServerConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, oauth.getClientGrantType());
addOptionIfNotNull(options, ServerConfig.OAUTH_SCOPE, oauth.getClientScope());
addOptionIfNotNull(options, ServerConfig.OAUTH_AUDIENCE, oauth.getClientAudience());
addOptionIfNotNull(options, ServerConfig.OAUTH_JWKS_ENDPOINT_URI, oauth.getJwksEndpointUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ public void testGenerateDeploymentWithOAuthWithClientSecret() {
.withTokenEndpointUri("http://my-oauth-server")
.withAudience("kafka")
.withScope("all")
.withGrantType("custom_client_credentials")
.withNewClientSecret()
.withSecretName("my-secret-secret")
.withKey("my-secret-key")
Expand All @@ -1079,6 +1080,7 @@ public void testGenerateDeploymentWithOAuthWithClientSecret() {
assertThat(bridgeConfigurations, containsString("kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
"oauth.client.id=\"my-client-id\" " +
"oauth.token.endpoint.uri=\"http://my-oauth-server\" " +
"oauth.client.credentials.grant.type=\"custom_client_credentials\" " +
"oauth.scope=\"all\" " +
"oauth.audience=\"kafka\" " +
"oauth.client.secret=\"${strimzidir:/opt/strimzi/oauth/my-secret-secret:my-secret-key}\";"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,7 @@ public void testOauthConfiguration() {
.withMaxSecondsWithoutReauthentication(3600)
.withJwksMinRefreshPauseSeconds(5)
.withEnablePlain(true)
.withClientGrantType("custom_client_credentials")
.withTokenEndpointUri("http://token")
.withConnectTimeoutSeconds(30)
.withReadTimeoutSeconds(30)
Expand Down Expand Up @@ -2014,9 +2015,9 @@ public void testOauthConfiguration() {
"ssl.endpoint.identification.algorithm=HTTPS",
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
"listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler",
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN",
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));
}
Expand All @@ -2039,6 +2040,7 @@ public void testOauthConfigurationWithPlainOnly() {
.withTokenEndpointUri("http://token")
.withClientAudience("kafka")
.withClientScope("messaging")
.withClientGrantType("custom_client_credentials")
.withConnectTimeoutSeconds(30)
.withEnableMetrics(true)
.endKafkaListenerAuthenticationOAuth()
Expand Down Expand Up @@ -2071,7 +2073,7 @@ public void testOauthConfigurationWithPlainOnly() {
"ssl.endpoint.identification.algorithm=HTTPS",
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.scope=\"messaging\" oauth.audience=\"kafka\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.connect.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.scope=\"messaging\" oauth.audience=\"kafka\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.connect.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
"listener.name.plain-9092.sasl.enabled.mechanisms=PLAIN",
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));
}
Expand Down Expand Up @@ -2254,6 +2256,7 @@ public void testOAuthOptions() {
.withReadTimeoutSeconds(60)
.withHttpRetries(2)
.withHttpRetryPauseMs(500)
.withClientGrantType("custom_client_credentials")
.withClientAudience("kafka")
.withClientScope("messaging")
.withEnableMetrics(true)
Expand All @@ -2267,6 +2270,7 @@ public void testOAuthOptions() {
expectedOptions.put(ServerConfig.OAUTH_CHECK_ISSUER, String.valueOf(false));
expectedOptions.put(ServerConfig.OAUTH_CHECK_AUDIENCE, String.valueOf(true));
expectedOptions.put(ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, "@.aud && @.aud == 'something'");
expectedOptions.put(ServerConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, "custom_client_credentials");
expectedOptions.put(ServerConfig.OAUTH_SCOPE, "messaging");
expectedOptions.put(ServerConfig.OAUTH_AUDIENCE, "kafka");
expectedOptions.put(ServerConfig.OAUTH_JWKS_ENDPOINT_URI, "http://jwks-endpoint");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,6 +1727,7 @@ public void testPodSetWithOAuthWithClientSecret() {
.withTokenEndpointUri("http://my-oauth-server")
.withAudience("kafka")
.withScope("all")
.withGrantType("custom-client-credentials")
.withNewClientSecret()
.withSecretName("my-secret-secret")
.withKey("my-secret-key")
Expand All @@ -1742,7 +1743,7 @@ public void testPodSetWithOAuthWithClientSecret() {
String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME);
assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT"));
assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER"));
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"my-client-id\" oauth.token.endpoint.uri=\"http://my-oauth-server\" oauth.scope=\"all\" oauth.audience=\"kafka\" oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"my-client-id\" oauth.token.endpoint.uri=\"http://my-oauth-server\" oauth.client.credentials.grant.type=\"custom-client-credentials\" oauth.scope=\"all\" oauth.audience=\"kafka\" oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));
assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,8 @@ public void testPodSetWithOAuthWithClientSecret() {
new KafkaClientAuthenticationOAuthBuilder()
.withClientId("my-client-id")
.withTokenEndpointUri("http://my-oauth-server")
.withScope("all")
.withGrantType("custom_client_credentials")
.withNewClientSecret()
.withSecretName("my-secret-secret")
.withKey("my-secret-key")
Expand All @@ -1839,6 +1841,8 @@ public void testPodSetWithOAuthWithClientSecret() {
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
"oauth.client.id=\"my-client-id\" " +
"oauth.token.endpoint.uri=\"http://my-oauth-server\" " +
"oauth.client.credentials.grant.type=\"custom_client_credentials\" " +
"oauth.scope=\"all\" " +
"oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));

// Check PodSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,8 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
.withKey("clientAssertionKey")
.withSecretName("clientAssertionSecretName")
.endClientAssertion()
.withScope("all")
.withGrantType("custom_client_credentials")
.withTlsTrustedCertificates(new CertSecretSourceBuilder().withCertificate("ca.crt").withSecretName("my-oauth-secret").build())
.endKafkaClientAuthenticationOAuth()
.build();
Expand All @@ -770,6 +772,8 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
"oauth.refresh.token", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/refreshTokenSecretName:refreshTokenKey}",
"oauth.password.grant.password", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/passwordSecretSecretName:passwordSecretPassword}",
"oauth.client.assertion", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/clientAssertionSecretName:clientAssertionKey}",
"oauth.scope", "all",
"oauth.client.credentials.grant.type", "custom_client_credentials",
"oauth.ssl.truststore.location", "/tmp/kafka/clusters/sourceClusterAlias-oauth.truststore.p12",
"oauth.ssl.truststore.type", "PKCS12",
"oauth.ssl.truststore.password", PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR)));
Expand Down
6 changes: 5 additions & 1 deletion docker-images/artifacts/kafka-thirdparty-libs/3.9.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</licenses>

<properties>
<strimzi-oauth.version>0.16.2</strimzi-oauth.version>
<strimzi-oauth.version>0.17.0-pre</strimzi-oauth.version>
<strimzi-metrics-reporter.version>0.2.0</strimzi-metrics-reporter.version>
<prometheus.version>1.3.6</prometheus.version>
<cruise-control.version>2.5.143</cruise-control.version>
Expand All @@ -37,6 +37,10 @@
<id>cruise-control</id>
<url>https://linkedin.jfrog.io/artifactory/cruise-control/</url>
</repository>
<repository>
<id>oauth-pre</id>
<url>https://raw.githubusercontent.com/mstruk/strimzi-kafka-oauth/m2repo/m2repo</url>
</repository>
</repositories>

<dependencyManagement>
Expand Down
6 changes: 5 additions & 1 deletion docker-images/artifacts/kafka-thirdparty-libs/4.0.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
</licenses>

<properties>
<strimzi-oauth.version>0.16.2</strimzi-oauth.version>
<strimzi-oauth.version>0.17.0-pre</strimzi-oauth.version>
<strimzi-metrics-reporter.version>0.2.0</strimzi-metrics-reporter.version>
<prometheus.version>1.3.6</prometheus.version>
<cruise-control.version>2.5.143</cruise-control.version>
Expand All @@ -36,6 +36,10 @@
<id>cruise-control</id>
<url>https://linkedin.jfrog.io/artifactory/cruise-control/</url>
</repository>
<repository>
<id>oauth-pre</id>
<url>https://raw.githubusercontent.com/mstruk/strimzi-kafka-oauth/m2repo/m2repo</url>
</repository>
</repositories>

<dependencyManagement>
Expand Down
Loading
Loading