Skip to content

Commit

Permalink
[feat][proxy] PIP 97: Implement for ProxyConnection (apache#19292)
Browse files Browse the repository at this point in the history
PIP: apache#12105 

### Motivation

Implement asynchronous auth for the proxy connection. This is one of the core PRs for implementing apache#12105. 

### Modifications

* Update `ProxyConnection` class to asynchronously handle the authentication result. The result is handled on the handler's event loop to ensure correctness.
* Update `ProxyAuthenticationTest` class to implement async auth methods and to make authentication asynchronous to test that code path.

### Verifying this change

There is an updated test, but it doesn't cover all code paths in this PR.

### Documentation

- [x] `doc-not-needed`

We do not need to document this portion of PIP 97.

### Matching PR in forked repository

PR in forked repository: michaeljmarshall#16
  • Loading branch information
michaeljmarshall authored Feb 1, 2023
1 parent 0273f31 commit fa6af43
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,9 @@ protected static boolean isTlsChannel(Channel channel) {
return channel.pipeline().get(ServiceChannelInitializer.TLS_HANDLER) != null;
}

private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
private synchronized void completeConnect() throws PulsarClientException {
Supplier<ClientCnx> clientCnxSupplier;
if (service.getConfiguration().isAuthenticationEnabled()) {
if (service.getConfiguration().isForwardAuthorizationCredentials()) {
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}
clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);
Expand Down Expand Up @@ -423,29 +419,51 @@ public void brokerConnected(DirectProxyHandler directProxyHandler, CommandConnec
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData)
throws Exception {
AuthData brokerData = authState.authenticate(clientData);
// authentication has completed, will send newConnected command.
if (authState.isComplete()) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
authState
.authenticateAsync(clientData)
.whenCompleteAsync((authChallenge, throwable) -> {
if (throwable == null) {
authChallengeSuccessCallback(authChallenge);
} else {
authenticationFailedCallback(throwable);
}
}, ctx.executor());
}

protected void authenticationFailedCallback(Throwable t) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, t);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect(clientData);
// Always run in this class's event loop.
protected void authChallengeSuccessCallback(AuthData authChallenge) {
try {
// authentication has completed, will send newConnected command.
if (authChallenge == null) {
clientAuthRole = authState.getAuthRole();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}

// First connection
if (this.connectionPool == null || state == State.Connecting) {
// authentication has completed, will send newConnected command.
completeConnect();
}
return;
}
return;
}

// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, brokerData, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
// auth not complete, continue auth with client side.
final ByteBuf msg = Commands.newAuthChallenge(authMethod, authChallenge, protocolVersionToAdvertise);
writeAndFlush(msg);
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Authentication in progress client by method {}.",
remoteAddress, authMethod);
}
} catch (Exception e) {
authenticationFailedCallback(e);
}
}

Expand Down Expand Up @@ -479,7 +497,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),

// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
completeConnect(null);
completeConnect();
return;
}

Expand All @@ -493,6 +511,14 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authMethod = "none";
}

if (service.getConfiguration().isForwardAuthorizationCredentials()) {
// We store the first clientData here. Before this commit, we stored the last clientData.
// Since this only works when forwarding single staged authentication, first == last is true.
// Here is an issue to fix the protocol: https://github.com/apache/pulsar/issues/19291.
this.clientAuthData = clientData;
this.clientAuthMethod = authMethod;
}

authenticationProvider = service
.getAuthenticationService()
.getAuthenticationProvider(authMethod);
Expand All @@ -504,7 +530,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));

completeConnect(clientData);
completeConnect();
return;
}

Expand All @@ -518,9 +544,7 @@ remoteAddress, protocolVersionToAdvertise, getRemoteEndpointProtocolVersion(),
authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession);
doAuthentication(clientData);
} catch (Exception e) {
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
final ByteBuf msg = Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate");
writeAndFlushAndClose(msg);
authenticationFailedCallback(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import javax.naming.AuthenticationException;

Expand Down Expand Up @@ -136,7 +137,7 @@ public String getAuthMethodName() {
}

@Override
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
String commandData = null;
if (authData.hasDataFromCommand()) {
commandData = authData.getCommandData();
Expand All @@ -150,9 +151,12 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
log.warn("Auth failed due to timeout");
throw new AuthenticationException("Authentication data has been expired");
return CompletableFuture
.failedFuture(new AuthenticationException("Authentication data has been expired"));
}
return element.get("entityType").getAsString();
final String result = element.get("entityType").getAsString();
// Run in another thread to attempt to test the async logic
return CompletableFuture.supplyAsync(() -> result);
}
}

Expand Down

0 comments on commit fa6af43

Please sign in to comment.