Skip to content

Commit

Permalink
[feat][broker] Update AuthenticationProvider to simplify HTTP Authn (a…
Browse files Browse the repository at this point in the history
…pache#19197)

PIP: apache#12105 

### Motivation

This is the first of several PRs to implement [PIP 97](apache#12105).

This PR seeks to start to solve the fact that the `AuthenticationState` class currently authenticates `authData` twice instead of just once. This change is important to make before we are able to utilize the async methods introduced in apache#12104.

Historical context: apache#14044 introduced the `AuthenticationProvider#newHttpAuthState`  method. The only use case for this method in the pulsar code base is to let custom providers specify the `AuthenticationDataSource` on http request attributes. The primary problem with that implementation is that the `AuthenticationState` class currently involves authenticating the `authData` passed in to the `newHttpAuthState`. As such, this code is sub-optimal, and creates a confusing flow.

I propose we deprecate the `newHttpAuthState` method and instead start using the `authenticateHttpRequestAsync` and `authenticateHttpRequest` methods to allow custom implementations to configure the desired `AuthenticationDataSource` on the request attributes.

In order to simplify the diff for reviewers, this PR uses the deprecated `AuthenticationProvider#authenticateHttpRequest` method. I plan to follow up and switch to use the `AuthenticationProvider#authenticateHttpRequestAsync` method.

Note that these changes are completely backwards compatible. The only risk is to users that have custom code loaded into the broker that calls the `AuthenticationProvider#authenticateHttpRequest` method.

### Modifications

* Deprecate `AuthenticationService#authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)`. It is no longer used.
* Deprecate `AuthenticationProvider#newHttpAuthState(HttpServletRequest request)`. It is no longer used outside of the `AuthenticationProvider` interface itself.
* Remove `@InterfaceStability.Unstable` annotation from the `authenticateHttpRequestAsync` method. When I introduced that annotation, I was under the impression that we didn't need it. However, in order to meet the requirements introduced in apache#14044, we need to let custom `AuthenticationProviders` add their own attributes.
* Update the default implementation of `authenticateHttpRequest`. Because the previous implementation was unreachable by all auth providers except for the SASL implementation, this is a backwards compatible change.
* Implement changes for the `AuthenticationProviderToken` so that it no longer relies on `newHttpAuthState`.

### Verifying this change

I added new tests.

### Does this pull request potentially affect one of the following parts:

- [x] The public API

This changes the public API within the broker by marking some methods as `@Deprecated`.

### Documentation

- [x] `doc-not-needed`

We document the `AuthenticationProvider` interface in the code. I added these docs. There is currently no where else to update docs.

### Matching PR in forked repository

PR in forked repository: #12

### Additional motivation from PR discussion

My primary motivation for this PR is to implement PIP 97. If we want authentication to be asynchronous, we cannot assume that when the `AuthenticationState` object is initialized, the `authenticationDataSource` and the `authRole` are present because the authentication might not yet be completed. My goal with this PR is to break an unnecessary relationship between `AuthenticationState` and http request authentication that was created by apache#14044.
  • Loading branch information
michaeljmarshall authored Jan 19, 2023
1 parent ea4f7eb commit 3c38ed5
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.authentication;

import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -28,7 +30,6 @@
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -103,7 +104,16 @@ default AuthenticationState newAuthState(AuthData authData,

/**
* Create an http authentication data State use passed in AuthenticationDataSource.
* @deprecated implementations that previously relied on this should update their implementation of
* {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)} or of
* {@link #authenticateHttpRequestAsync(HttpServletRequest, HttpServletResponse)} so that the desired attributes
* are added in those methods.
*
* <p>Note: this method was only ever used to generate an {@link AuthenticationState} object in order to generate
* an {@link AuthenticationDataSource} that was added as the {@link AuthenticatedDataAttributeName} attribute to
* the http request. Removing this method removes an unnecessary step in the authentication flow.</p>
*/
@Deprecated(since = "2.12.0")
default AuthenticationState newHttpAuthState(HttpServletRequest request)
throws AuthenticationException {
return new OneStageAuthenticationState(request, this);
Expand All @@ -112,20 +122,17 @@ default AuthenticationState newHttpAuthState(HttpServletRequest request)
/**
* Validate the authentication for the given credentials with the specified authentication data.
*
* <p>Implementations of this method MUST modify the request by adding the {@link AuthenticatedRoleAttributeName}
* and the {@link AuthenticatedDataAttributeName} attributes.</p>
*
* <p>Warning: the calling thread is an IO thread. Any implementations that rely on blocking behavior
* must ensure that the execution is completed on using a separate thread pool to ensure IO threads
* are never blocked.</p>
*
* <p>Note: this method is marked as unstable because the Pulsar code base only calls it for the
* Pulsar Broker Auth SASL plugin. All non SASL HTTP requests are authenticated using the
* {@link AuthenticationProvider#authenticateAsync(AuthenticationDataSource)} method. As such,
* this method might be removed in favor of the SASL provider implementing the
* {@link AuthenticationProvider#authenticateAsync(AuthenticationDataSource)} method.</p>
*
* @return Set response, according to passed in request.
* @return Set response, according to passed in request, and return whether we should do following chain.doFilter.
* @throws Exception when authentication failed
* and return whether we should do following chain.doFilter or not.
*/
@InterfaceStability.Unstable
default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletRequest request,
HttpServletResponse response) {
try {
Expand All @@ -138,10 +145,20 @@ default CompletableFuture<Boolean> authenticateHttpRequestAsync(HttpServletReque
/**
* Set response, according to passed in request.
* and return whether we should do following chain.doFilter or not.
*
* <p>Implementations of this method MUST modify the request by adding the {@link AuthenticatedRoleAttributeName}
* and the {@link AuthenticatedDataAttributeName} attributes.</p>
*
* @return Set response, according to passed in request, and return whether we should do following chain.doFilter.
* @throws Exception when authentication failed
* @deprecated use and implement {@link AuthenticationProvider#authenticateHttpRequestAsync} instead.
*/
@Deprecated
default boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
throw new AuthenticationException("Not supported");
AuthenticationState authenticationState = newHttpAuthState(request);
String role = authenticate(authenticationState.getAuthDataSource());
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName, authenticationState.getAuthDataSource());
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.authentication;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
Expand All @@ -39,6 +41,7 @@
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
Expand Down Expand Up @@ -160,6 +163,20 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
}
}

@Override
public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response) throws Exception {
HttpServletRequestWrapper wrappedRequest = new HttpServletRequestWrapper(request);
String httpHeaderValue = wrappedRequest.getHeader(HTTP_HEADER_NAME);
if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
throw new AuthenticationException("Invalid HTTP Authorization header");
}
AuthenticationDataSource authenticationDataSource = new AuthenticationDataHttps(wrappedRequest);
String role = authenticate(authenticationDataSource);
request.setAttribute(AuthenticatedRoleAttributeName, role);
request.setAttribute(AuthenticatedDataAttributeName, authenticationDataSource);
return true;
}

@Override
public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession)
throws AuthenticationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.authentication;

import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -28,10 +30,12 @@
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,16 +89,77 @@ public AuthenticationService(ServiceConfiguration conf) throws PulsarServerExcep
}
}

private String getAuthMethodName(HttpServletRequest request) {
return request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME);
}

private AuthenticationProvider getAuthProvider(String authMethodName) throws AuthenticationException {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
return providerToUse;
}

public boolean authenticateHttpRequest(HttpServletRequest request, HttpServletResponse response)
throws Exception {
String authMethodName = getAuthMethodName(request);
if (authMethodName == null
&& SaslConstants.SASL_TYPE_VALUE.equalsIgnoreCase(request.getHeader(SaslConstants.SASL_HEADER_TYPE))) {
// This edge case must be handled because the Pulsar SASL implementation does not add the
// X-Pulsar-Auth-Method-Name header.
authMethodName = SaslConstants.AUTH_METHOD_NAME;
}
if (authMethodName != null) {
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
return providerToUse.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + providerToUse.getAuthMethodName() + " : "
+ e.getMessage(), e);
}
throw e;
}
} else {
for (AuthenticationProvider provider : providers.values()) {
try {
return provider.authenticateHttpRequest(request, response);
} catch (AuthenticationException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication failed for provider " + provider.getAuthMethodName() + ": "
+ e.getMessage(), e);
}
// Ignore the exception because we don't know which authentication method is expected here.
}
}
// No authentication provided
if (!providers.isEmpty()) {
if (StringUtils.isNotBlank(anonymousUserRole)) {
request.setAttribute(AuthenticatedRoleAttributeName, anonymousUserRole);
request.setAttribute(AuthenticatedDataAttributeName, new AuthenticationDataHttps(request));
return true;
}
// If at least a provider was configured, then the authentication needs to be provider
throw new AuthenticationException("Authentication required");
} else {
// No authentication required
return true;
}
}
}

/**
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}
*/
@Deprecated(since = "2.12.0")
public String authenticateHttpRequest(HttpServletRequest request, AuthenticationDataSource authData)
throws AuthenticationException {
String authMethodName = request.getHeader(AuthenticationFilter.PULSAR_AUTH_METHOD_NAME);
String authMethodName = getAuthMethodName(request);

if (authMethodName != null) {
AuthenticationProvider providerToUse = providers.get(authMethodName);
if (providerToUse == null) {
throw new AuthenticationException(
String.format("Unsupported authentication method: [%s].", authMethodName));
}
AuthenticationProvider providerToUse = getAuthProvider(authMethodName);
try {
if (authData == null) {
AuthenticationState authenticationState = providerToUse.newHttpAuthState(request);
Expand Down Expand Up @@ -140,10 +205,11 @@ public String authenticateHttpRequest(HttpServletRequest request, Authentication
/**
* Mark this function as deprecated, it is recommended to use a method with the AuthenticationDataSource
* signature to implement it.
* @deprecated use {@link #authenticateHttpRequest(HttpServletRequest, HttpServletResponse)}.
*/
@Deprecated
public String authenticateHttpRequest(HttpServletRequest request) throws AuthenticationException {
return authenticateHttpRequest(request, null);
return authenticateHttpRequest(request, (AuthenticationDataSource) null);
}

public AuthenticationProvider getAuthenticationProvider(String authMethodName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,54 +49,12 @@ public AuthenticationFilter(AuthenticationService authenticationService) {
this.authenticationService = authenticationService;
}

private boolean isSaslRequest(HttpServletRequest request) {
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE) == null
|| request.getHeader(SaslConstants.SASL_HEADER_TYPE).isEmpty()) {
return false;
}
if (request.getHeader(SaslConstants.SASL_HEADER_TYPE)
.equalsIgnoreCase(SaslConstants.SASL_TYPE_VALUE)) {
return true;
} else {
return false;
}
}

@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
try {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;

if (!isSaslRequest(httpRequest)) {
// not sasl type, return role directly.
String authMethodName = httpRequest.getHeader(PULSAR_AUTH_METHOD_NAME);
String role;
if (authMethodName != null && authenticationService.getAuthenticationProvider(authMethodName) != null) {
AuthenticationState authenticationState = authenticationService
.getAuthenticationProvider(authMethodName).newHttpAuthState(httpRequest);
request.setAttribute(AuthenticatedDataAttributeName, authenticationState.getAuthDataSource());
role = authenticationService.authenticateHttpRequest(
(HttpServletRequest) request, authenticationState.getAuthDataSource());
} else {
request.setAttribute(AuthenticatedDataAttributeName,
new AuthenticationDataHttps((HttpServletRequest) request));
role = authenticationService.authenticateHttpRequest((HttpServletRequest) request);
}
request.setAttribute(AuthenticatedRoleAttributeName, role);

if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authenticated HTTP request with role {}", request.getRemoteAddr(), role);
}
chain.doFilter(request, response);
return;
}

boolean doFilter = authenticationService
.getAuthenticationProvider(SaslConstants.AUTH_METHOD_NAME)
.authenticateHttpRequest(httpRequest, httpResponse);

.authenticateHttpRequest((HttpServletRequest) request, (HttpServletResponse) response);
if (doFilter) {
chain.doFilter(request, response);
}
Expand All @@ -111,7 +66,6 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
} else {
LOG.error("[{}] Error performing authentication for HTTP", request.getRemoteAddr(), e);
}
return;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,8 @@ public void testTokenFromHttpParams() throws Exception {
doReturn("127.0.0.1").when(servletRequest).getRemoteAddr();
doReturn(0).when(servletRequest).getRemotePort();

AuthenticationState authState = provider.newHttpAuthState(servletRequest);
provider.authenticate(authState.getAuthDataSource());
boolean doFilter = provider.authenticateHttpRequest(servletRequest, null);
assertTrue(doFilter, "Authentication should have passed");
}

@Test
Expand All @@ -910,8 +910,8 @@ public void testTokenFromHttpHeaders() throws Exception {
doReturn("127.0.0.1").when(servletRequest).getRemoteAddr();
doReturn(0).when(servletRequest).getRemotePort();

AuthenticationState authState = provider.newHttpAuthState(servletRequest);
provider.authenticate(authState.getAuthDataSource());
boolean doFilter = provider.authenticateHttpRequest(servletRequest, null);
assertTrue(doFilter, "Authentication should have passed");
}

private static String createTokenWithAudience(Key signingKey, String audienceClaim, List<String> audience) {
Expand Down
Loading

0 comments on commit 3c38ed5

Please sign in to comment.