Skip to content

HBASE-26553 OAuth Bearer authentication mech plugin for SASL (initial commit) #4019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.oauthbearer;

import javax.security.auth.callback.Callback;
import org.apache.commons.lang3.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A {@code Callback} for use by the {@code SaslClient} and {@code Login}
* implementations when they require an OAuth 2 bearer token. Callback handlers
* should use the {@link #error(String, String, String)} method to communicate
* errors returned by the authorization server as per
* <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
* 2.0 Authorization Framework</a>. Callback handlers should communicate other
* problems by raising an {@code IOException}.
* <p>
* This class was introduced in 3.0.0 and, while it feels stable, it could
* evolve. We will try to evolve the API in a compatible manner, but we reserve
* the right to make breaking changes in minor releases, if necessary. We will
* update the {@code InterfaceStability} annotation and this notice once the API
* is considered stable.
*/
@InterfaceAudience.Private
public class OAuthBearerTokenCallback implements Callback {
private OAuthBearerToken token = null;
private String errorCode = null;
private String errorDescription = null;
private String errorUri = null;

/**
* Return the (potentially null) token
*
* @return the (potentially null) token
*/
public OAuthBearerToken token() {
return token;
}

/**
* Return the optional (but always non-empty if not null) error code as per
* <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
* 2.0 Authorization Framework</a>.
*
* @return the optional (but always non-empty if not null) error code
*/
public String errorCode() {
return errorCode;
}

/**
* Return the (potentially null) error description as per
* <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
* 2.0 Authorization Framework</a>.
*
* @return the (potentially null) error description
*/
public String errorDescription() {
return errorDescription;
}

/**
* Return the (potentially null) error URI as per
* <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
* 2.0 Authorization Framework</a>.
*
* @return the (potentially null) error URI
*/
public String errorUri() {
return errorUri;
}

/**
* Set the token. All error-related values are cleared.
*
* @param token
* the optional token to set
*/
public void token(OAuthBearerToken token) {
this.token = token;
this.errorCode = null;
this.errorDescription = null;
this.errorUri = null;
}

/**
* Set the error values as per
* <a href="https://tools.ietf.org/html/rfc6749#section-5.2">RFC 6749: The OAuth
* 2.0 Authorization Framework</a>. Any token is cleared.
*
* @param errorCode
* the mandatory error code to set
* @param errorDescription
* the optional error description to set
* @param errorUri
* the optional error URI to set
*/
public void error(String errorCode, String errorDescription, String errorUri) {
if (StringUtils.isEmpty(errorCode)) {
throw new IllegalArgumentException("error code must not be empty");
}
this.errorCode = errorCode;
this.errorDescription = errorDescription;
this.errorUri = errorUri;
this.token = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.oauthbearer.internals;

import static org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerUtils.OAUTHBEARER_MECHANISM;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslClientFactory;
import javax.security.sasl.SaslException;
import org.apache.hadoop.hbase.exceptions.IllegalSaslStateException;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.auth.AuthenticateCallbackHandler;
import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerToken;
import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerTokenCallback;
import org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@code SaslClient} implementation for SASL/OAUTHBEARER in Kafka. This
* implementation requires an instance of {@code AuthenticateCallbackHandler}
* that can handle an instance of {@link OAuthBearerTokenCallback} and return
* the {@link OAuthBearerToken} generated by the {@code login()} event on the
* {@code LoginContext}.
*
* @see <a href="https://tools.ietf.org/html/rfc6750#section-2.1">RFC 6750 Section 2.1</a>
*
* This class has been copy-and-pasted from Kafka codebase.
*/
@InterfaceAudience.Public
public class OAuthBearerSaslClient implements SaslClient {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we just need to extend the HBase base SASL client in order to get the RPC encryption. That's probably why it didn't work OOTB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HBaseSaslRpcClient uses to composition to extend the functionality of SaslClient and it's already being used in the OAuthBearer case. I don't think inheriting it would help here.

However I don't know how property negotiation works in SASL and need dig into Kafka's code again for our usecase. Maybe they haven't implemented that either. So, in my understanding the negotiated QoP indicates what transport level protection is active and we should require "auth-conf" here to get wire encryption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some further investigation: I need to set "hbase.rpc.protection" to "privacy" which will be translated to "auth-conf" QoP in Sasl properties. These properties will be set both in Sasl client and server. I think this is accurate so far.

But where is the negotiation part?

I mean this is certainly an explicit setting inside Hbase, where does it become "negotiated"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to set "hbase.rpc.protection" to "privacy" which will be translated to "auth-conf" QoP in Sasl properties

This has been something I've never fully looked into, but, IIRC, the quality of protection (QOP) ultimately defers back to the mechanism in use to give that protection. I can't point to any documentation so the easiest thing is to try this out.

I had used Wireshark just to listen to the RegionServer traffic on my local host when running your standalone tool.

I don't think inheriting it would help here.

See the code here.

public void initCryptoCipher(RPCProtos.CryptoCipherMeta cryptoCipherMeta,
Configuration conf) throws IOException {
// create SaslAES for client
cryptoAES = EncryptionUtil.createCryptoAES(cryptoCipherMeta, conf);
cryptoAesEnable = true;
}
/**
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called.
* @return a SASL wrapped InputStream
* @throws IOException
*/
public InputStream getInputStream() throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
// If Crypto AES is enabled, return cryptoInputStream which unwrap the data with Crypto AES.
if (cryptoAesEnable && cryptoInputStream != null) {
return cryptoInputStream;
}
return saslInputStream;
}
class WrappedInputStream extends FilterInputStream {
private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0);
public WrappedInputStream(InputStream in) throws IOException {
super(in);
}
@Override
public int read() throws IOException {
byte[] b = new byte[1];
int n = read(b, 0, 1);
return (n != -1) ? b[0] : -1;
}
@Override
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
@Override
public synchronized int read(byte[] buf, int off, int len) throws IOException {
// fill the buffer with the next RPC message
if (unwrappedRpcBuffer.remaining() == 0) {
readNextRpcPacket();
}
// satisfy as much of the request as possible
int readLen = Math.min(len, unwrappedRpcBuffer.remaining());
unwrappedRpcBuffer.get(buf, off, readLen);
return readLen;
}
// unwrap messages with Crypto AES
private void readNextRpcPacket() throws IOException {
LOG.debug("reading next wrapped RPC packet");
DataInputStream dis = new DataInputStream(in);
int rpcLen = dis.readInt();
byte[] rpcBuf = new byte[rpcLen];
dis.readFully(rpcBuf);
// unwrap with Crypto AES
rpcBuf = cryptoAES.unwrap(rpcBuf, 0, rpcBuf.length);
if (LOG.isDebugEnabled()) {
LOG.debug("unwrapping token of length:" + rpcBuf.length);
}
unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf);
}
}
/**
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called.
* @return a SASL wrapped OutputStream
* @throws IOException
*/
public OutputStream getOutputStream() throws IOException {
if (!saslClient.isComplete()) {
throw new IOException("Sasl authentication exchange hasn't completed yet");
}
// If Crypto AES is enabled, return cryptoOutputStream which wrap the data with Crypto AES.
if (cryptoAesEnable && cryptoOutputStream != null) {
return cryptoOutputStream;
}
return saslOutputStream;
}
class WrappedOutputStream extends FilterOutputStream {
public WrappedOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
public void write(byte[] buf, int off, int len) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("wrapping token of length:" + len);
}
// wrap with Crypto AES
byte[] wrapped = cryptoAES.wrap(buf, off, len);
DataOutputStream dob = new DataOutputStream(out);
dob.writeInt(wrapped.length);
dob.write(wrapped, 0, wrapped.length);
dob.flush();
}
}

This is the commons-crypto logic I'm referring to. This wrapping and unwrapping will be encrypting the traffic after the SASL handshake is done.

Either way, let me try this out with setting the QOP.

But where is the negotiation part?

The negotiation is partially happening with the JDK itself and partially happening by the Callback implementation's you've written. SASL is just a series of challenge/response messages being passed between client and server.

static final byte BYTE_CONTROL_A = (byte) 0x01;
private static final Logger LOG = LoggerFactory.getLogger(OAuthBearerSaslClient.class);
private final CallbackHandler callbackHandler;

enum State {
SEND_CLIENT_FIRST_MESSAGE, RECEIVE_SERVER_FIRST_MESSAGE, RECEIVE_SERVER_MESSAGE_AFTER_FAILURE,
COMPLETE, FAILED
}

private State state;

public OAuthBearerSaslClient(AuthenticateCallbackHandler callbackHandler) {
this.callbackHandler = Objects.requireNonNull(callbackHandler);
setState(State.SEND_CLIENT_FIRST_MESSAGE);
}

public CallbackHandler callbackHandler() {
return callbackHandler;
}

@Override
public String getMechanismName() {
return OAUTHBEARER_MECHANISM;
}

@Override
public boolean hasInitialResponse() {
return true;
}

@Override
public byte[] evaluateChallenge(byte[] challenge) throws SaslException {
try {
OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
switch (state) {
case SEND_CLIENT_FIRST_MESSAGE:
if (challenge != null && challenge.length != 0) {
throw new SaslException("Expected empty challenge");
}
callbackHandler().handle(new Callback[] {callback});
setState(State.RECEIVE_SERVER_FIRST_MESSAGE);
return new OAuthBearerClientInitialResponse(callback.token().value()).toBytes();
case RECEIVE_SERVER_FIRST_MESSAGE:
if (challenge != null && challenge.length != 0) {
String jsonErrorResponse = new String(challenge, StandardCharsets.UTF_8);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending %%x01 response to server after receiving an error: {}",
jsonErrorResponse);
}
setState(State.RECEIVE_SERVER_MESSAGE_AFTER_FAILURE);
return new byte[] {BYTE_CONTROL_A};
}
callbackHandler().handle(new Callback[] {callback});
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully authenticated as {}", callback.token().principalName());
}
setState(State.COMPLETE);
return null;
default:
throw new IllegalSaslStateException("Unexpected challenge in Sasl client state " + state);
}
} catch (SaslException e) {
setState(State.FAILED);
throw e;
} catch (IOException | UnsupportedCallbackException e) {
setState(State.FAILED);
throw new SaslException(e.getMessage(), e);
}
}

@Override
public boolean isComplete() {
return state == State.COMPLETE;
}

@Override
public byte[] unwrap(byte[] incoming, int offset, int len) {
if (!isComplete()) {
throw new IllegalStateException("Authentication exchange has not completed");
}
return Arrays.copyOfRange(incoming, offset, offset + len);
}

@Override
public byte[] wrap(byte[] outgoing, int offset, int len) {
if (!isComplete()) {
throw new IllegalStateException("Authentication exchange has not completed");
}
return Arrays.copyOfRange(outgoing, offset, offset + len);
}

@Override
public Object getNegotiatedProperty(String propName) {
if (!isComplete()) {
throw new IllegalStateException("Authentication exchange has not completed");
}
if (Sasl.QOP.equals(propName)) {
return SaslUtil.QualityOfProtection.AUTHENTICATION.getSaslQop();
}
return null;
}

@Override
public void dispose() {
}

private void setState(State state) {
LOG.debug("Setting SASL/{} client state to {}", OAUTHBEARER_MECHANISM, state);
this.state = state;
}

public static class OAuthBearerSaslClientFactory implements SaslClientFactory {
@Override
public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol,
String serverName, Map<String, ?> props, CallbackHandler callbackHandler) {
String[] mechanismNamesCompatibleWithPolicy = getMechanismNames(props);
for (String mechanism : mechanisms) {
for (String s : mechanismNamesCompatibleWithPolicy) {
if (s.equals(mechanism)) {
if (!(Objects.requireNonNull(callbackHandler) instanceof AuthenticateCallbackHandler)) {
throw new IllegalArgumentException(
String.format("Callback handler must be castable to %s: %s",
AuthenticateCallbackHandler.class.getName(),
callbackHandler.getClass().getName()));
}
return new OAuthBearerSaslClient((AuthenticateCallbackHandler) callbackHandler);
}
}
}
return null;
}

@Override
public String[] getMechanismNames(Map<String, ?> props) {
return OAuthBearerUtils.mechanismNamesCompatibleWithPolicy(props);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.oauthbearer.internals;

import static org.apache.hadoop.hbase.security.oauthbearer.OAuthBearerUtils.OAUTHBEARER_MECHANISM;
import java.security.Provider;
import java.security.Security;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Public
public class OAuthBearerSaslClientProvider extends Provider {
private static final long serialVersionUID = 1L;

protected OAuthBearerSaslClientProvider() {
super("SASL/OAUTHBEARER Client Provider", 1.0, "SASL/OAUTHBEARER Client Provider for HBase");
put("SaslClientFactory." + OAUTHBEARER_MECHANISM,
OAuthBearerSaslClient.OAuthBearerSaslClientFactory.class.getName());
}

public static void initialize() {
Security.addProvider(new OAuthBearerSaslClientProvider());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we actually need to register a provider with the JCA security provider framework because we don't actually use it. IIRC, Kafka relies on JAAS to automatically perform the logins for them, whereas HBase explicitly sets up the implementation to use (via the AuthenticationProviderSelector). I think we can drop this class entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't work without it, because we don't instantiate the client and the server directly, but asking the Sasl framework to create them for us. In the client auth provider we call Sasl.createSaslClient() to create the client which doesn't work without the client registered for the mechanism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the client auth provider we call Sasl.createSaslClient() to create the client which doesn't work without the client registered for the mechanism.

Ah, ok. I remember that when we were messing around with other custom Providers. Thanks for clarifying!

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security.provider;

import static org.apache.hadoop.hbase.security.token.OAuthBearerTokenUtil.TOKEN_KIND;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Base client for client/server implementations for the OAuth Bearer (JWT) token auth'n method.
*/
@InterfaceAudience.Private
public class OAuthBearerSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider {

public static final SaslAuthMethod SASL_AUTH_METHOD = new SaslAuthMethod(
"OAUTHBEARER", (byte)83, "OAUTHBEARER", UserGroupInformation.AuthenticationMethod.TOKEN);

@Override
public SaslAuthMethod getSaslAuthMethod() {
return SASL_AUTH_METHOD;
}
Comment on lines +34 to +36
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, this SaslAuthMethod (HBase class) is how HBase's RPC classes are figuring out the correct client-side logic to use given a token (secret material).


@Override
public String getTokenKind() {
return TOKEN_KIND;
}
}
Loading