Skip to content

Commit

Permalink
[feat][misc] PIP-264: Add OpenTelemetry authentication and token metr…
Browse files Browse the repository at this point in the history
…ics (apache#23016)
  • Loading branch information
dragosvictor authored Aug 29, 2024
1 parent e2bbb4b commit 587af85
Show file tree
Hide file tree
Showing 26 changed files with 674 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class AuthenticationProviderAthenz implements AuthenticationProvider {
private List<String> domainNameList = null;
private int allowedOffset = 30;

private AuthenticationMetrics authenticationMetrics;

public enum ErrorCode {
UNKNOWN,
NO_CLIENT,
Expand All @@ -54,6 +56,14 @@ public enum ErrorCode {

@Override
public void initialize(ServiceConfiguration config) throws IOException {
initialize(Context.builder().config(config).build());
}

@Override
public void initialize(Context context) throws IOException {
authenticationMetrics = new AuthenticationMetrics(context.getOpenTelemetry(),
getClass().getSimpleName(), getAuthMethodName());
var config = context.getConfig();
String domainNames;
if (config.getProperty(DOMAIN_NAME_LIST) != null) {
domainNames = (String) config.getProperty(DOMAIN_NAME_LIST);
Expand Down Expand Up @@ -86,6 +96,11 @@ public String getAuthMethodName() {
return "athenz";
}

@Override
public void incrementFailureMetric(Enum<?> errorCode) {
authenticationMetrics.recordFailure(errorCode);
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
SocketAddress clientAddress;
Expand Down Expand Up @@ -141,7 +156,7 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat

if (token.validate(ztsPublicKey, allowedOffset, false, null)) {
log.debug("Athenz Role Token : {}, Authenticated for Client: {}", roleToken, clientAddress);
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
authenticationMetrics.recordSuccess();
return token.getPrincipal();
} else {
errorCode = ErrorCode.INVALID_TOKEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

import com.yahoo.athenz.auth.token.RoleToken;
import com.yahoo.athenz.zpe.ZpeConsts;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import javax.naming.AuthenticationException;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -55,7 +51,7 @@ public void setup() throws Exception {

// Initialize authentication provider
provider = new AuthenticationProviderAthenz();
provider.initialize(config);
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());

// Specify Athenz configuration file for AuthZpeClient which is used in AuthenticationProviderAthenz
System.setProperty(ZpeConsts.ZPE_PROP_ATHENZ_CONF, "./src/test/resources/athenz.conf.test");
Expand All @@ -69,7 +65,7 @@ public void testInitilizeFromSystemPropeties() {
emptyConf.setProperties(emptyProp);
AuthenticationProviderAthenz sysPropProvider1 = new AuthenticationProviderAthenz();
try {
sysPropProvider1.initialize(emptyConf);
sysPropProvider1.initialize(AuthenticationProvider.Context.builder().config(emptyConf).build());
assertEquals(sysPropProvider1.getAllowedOffset(), 30); // default allowed offset is 30 sec
} catch (Exception e) {
fail("Fail to Read pulsar.athenz.domain.names from System Properties");
Expand All @@ -78,7 +74,7 @@ public void testInitilizeFromSystemPropeties() {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "0");
AuthenticationProviderAthenz sysPropProvider2 = new AuthenticationProviderAthenz();
try {
sysPropProvider2.initialize(config);
sysPropProvider2.initialize(AuthenticationProvider.Context.builder().config(config).build());
assertEquals(sysPropProvider2.getAllowedOffset(), 0);
} catch (Exception e) {
fail("Failed to get allowed offset from system property");
Expand All @@ -87,15 +83,15 @@ public void testInitilizeFromSystemPropeties() {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "invalid");
AuthenticationProviderAthenz sysPropProvider3 = new AuthenticationProviderAthenz();
try {
sysPropProvider3.initialize(config);
sysPropProvider3.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Invalid allowed offset should not be specified");
} catch (IOException e) {
}

System.setProperty("pulsar.athenz.role.token_allowed_offset", "-1");
AuthenticationProviderAthenz sysPropProvider4 = new AuthenticationProviderAthenz();
try {
sysPropProvider4.initialize(config);
sysPropProvider4.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Negative allowed offset should not be specified");
} catch (IOException e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@
public class AuthenticationProviderOpenID implements AuthenticationProvider {
private static final Logger log = LoggerFactory.getLogger(AuthenticationProviderOpenID.class);

private static final String SIMPLE_NAME = AuthenticationProviderOpenID.class.getSimpleName();

// Must match the value used by the OAuth2 Client Plugin.
private static final String AUTH_METHOD_NAME = "token";

Expand Down Expand Up @@ -148,8 +146,18 @@ public class AuthenticationProviderOpenID implements AuthenticationProvider {
private String[] allowedAudiences;
private ApiClient k8sApiClient;

private AuthenticationMetrics authenticationMetrics;

@Override
public void initialize(ServiceConfiguration config) throws IOException {
initialize(Context.builder().config(config).build());
}

@Override
public void initialize(Context context) throws IOException {
authenticationMetrics = new AuthenticationMetrics(context.getOpenTelemetry(),
getClass().getSimpleName(), getAuthMethodName());
var config = context.getConfig();
this.allowedAudiences = validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM, ROLE_CLAIM_DEFAULT);
this.isRoleClaimNotSubject = !ROLE_CLAIM_DEFAULT.equals(roleClaim);
Expand Down Expand Up @@ -181,15 +189,20 @@ public void initialize(ServiceConfiguration config) throws IOException {
.build();
httpClient = new DefaultAsyncHttpClient(clientConfig);
k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient);
}

@Override
public String getAuthMethodName() {
return AUTH_METHOD_NAME;
}

@Override
public void incrementFailureMetric(Enum<?> errorCode) {
authenticationMetrics.recordFailure(errorCode);
}

/**
* Authenticate the parameterized {@link AuthenticationDataSource} by verifying the issuer is an allowed issuer,
* then retrieving the JWKS URI from the issuer, then retrieving the Public key from the JWKS URI, and finally
Expand Down Expand Up @@ -219,7 +232,7 @@ CompletableFuture<DecodedJWT> authenticateTokenAsync(AuthenticationDataSource au
return authenticateToken(token)
.whenComplete((jwt, e) -> {
if (jwt != null) {
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
authenticationMetrics.recordSuccess();
}
// Failure metrics are incremented within methods above
});
Expand Down Expand Up @@ -463,10 +476,6 @@ DecodedJWT verifyJWT(PublicKey publicKey,
}
}

static void incrementFailureMetric(AuthenticationExceptionCode code) {
AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, AUTH_METHOD_NAME, code);
}

/**
* Validate the configured allow list of allowedIssuers. The allowedIssuers set must be nonempty in order for
* the plugin to authenticate any token. Thus, it fails initialization if the configuration is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;

import static org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
Expand All @@ -26,7 +27,6 @@
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS_DEFAULT;
import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
import com.auth0.jwk.Jwk;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -49,6 +49,7 @@
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.asynchttpclient.AsyncHttpClient;

public class JwksCache {
Expand All @@ -60,8 +61,11 @@ public class JwksCache {
private final ObjectReader reader = new ObjectMapper().readerFor(HashMap.class);
private final AsyncHttpClient httpClient;
private final OpenidApi openidApi;
private final AuthenticationProvider authenticationProvider;

JwksCache(ServiceConfiguration config, AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
JwksCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config,
AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
this.authenticationProvider = authenticationProvider;
// Store the clients
this.httpClient = httpClient;
this.openidApi = apiClient != null ? new OpenidApi(apiClient) : null;
Expand Down Expand Up @@ -91,7 +95,7 @@ public class JwksCache {

CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
if (jwksUri == null) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new IllegalArgumentException("jwksUri must not be null."));
}
return getJwkAndMaybeReload(Optional.of(jwksUri), keyId, false);
Expand Down Expand Up @@ -139,10 +143,10 @@ private CompletableFuture<List<Jwk>> getJwksFromJwksUri(String jwksUri) {
reader.readValue(result.getResponseBodyAsBytes());
future.complete(convertToJwks(jwksUri, jwks));
} catch (AuthenticationException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new AuthenticationException(
"Error retrieving public key at " + jwksUri + ": " + e.getMessage()));
}
Expand All @@ -152,7 +156,7 @@ private CompletableFuture<List<Jwk>> getJwksFromJwksUri(String jwksUri) {

CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
if (openidApi == null) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new AuthenticationException(
"Failed to retrieve public key from Kubernetes API server: Kubernetes fallback is not enabled."));
}
Expand All @@ -165,7 +169,7 @@ private CompletableFuture<List<Jwk>> getJwksFromKubernetesApiServer() {
openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback<String>() {
@Override
public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
// We want the message and responseBody here: https://github.com/kubernetes-client/java/issues/2066.
future.completeExceptionally(
new AuthenticationException("Failed to retrieve public key from Kubernetes API server. "
Expand All @@ -178,10 +182,10 @@ public void onSuccess(String result, int statusCode, Map<String, List<String>> r
HashMap<String, Object> jwks = reader.readValue(result);
future.complete(convertToJwks("Kubernetes API server", jwks));
} catch (AuthenticationException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new AuthenticationException(
"Error retrieving public key at Kubernetes API server: " + e.getMessage()));
}
Expand All @@ -198,7 +202,7 @@ public void onDownloadProgress(long bytesRead, long contentLength, boolean done)
}
});
} catch (ApiException e) {
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(
new AuthenticationException("Failed to retrieve public key from Kubernetes API server: "
+ e.getMessage()));
Expand All @@ -212,7 +216,7 @@ private Jwk getJwkForKID(Optional<String> maybeJwksUri, List<Jwk> jwks, String k
return jwk;
}
}
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
throw new IllegalArgumentException("No JWK found for Key ID " + keyId);
}

Expand Down
Loading

0 comments on commit 587af85

Please sign in to comment.