-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-26553 OAuth Bearer authentication mech plugin for SASL (initial commit) #4019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of review. Thanks for all of the cleanup you did already.
Moving ahead under the idea that we'll do some iteration on a feature branch -- I think this is pretty close. I would very much like to see an end to end unit test with some kind of embedded JWT authorization server before committing this. You mentioned offline that you were already doing this, so I'll wait patiently :) (I think you mentioned that nimbus might have some testing utility. Spring might also have something. Shout if you want me to dig in and look).
We definitely have some extra work to be done before this would be security-ready for others to consume, but it will be good to work through them separately.
* This class has been copy-and-pasted from Kafka codebase. | ||
*/ | ||
@InterfaceAudience.Public | ||
public class OAuthBearerSaslClient implements SaslClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we just need to extend the HBase base SASL client in order to get the RPC encryption. That's probably why it didn't work OOTB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HBaseSaslRpcClient
uses to composition to extend the functionality of SaslClient and it's already being used in the OAuthBearer case. I don't think inheriting it would help here.
However I don't know how property negotiation works in SASL and need dig into Kafka's code again for our usecase. Maybe they haven't implemented that either. So, in my understanding the negotiated QoP indicates what transport level protection is active and we should require "auth-conf" here to get wire encryption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some further investigation: I need to set "hbase.rpc.protection" to "privacy" which will be translated to "auth-conf" QoP in Sasl properties. These properties will be set both in Sasl client and server. I think this is accurate so far.
But where is the negotiation part?
I mean this is certainly an explicit setting inside Hbase, where does it become "negotiated"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to set "hbase.rpc.protection" to "privacy" which will be translated to "auth-conf" QoP in Sasl properties
This has been something I've never fully looked into, but, IIRC, the quality of protection (QOP) ultimately defers back to the mechanism in use to give that protection. I can't point to any documentation so the easiest thing is to try this out.
I had used Wireshark just to listen to the RegionServer traffic on my local host when running your standalone tool.
I don't think inheriting it would help here.
See the code here.
hbase/hbase-client/src/main/java/org/apache/hadoop/hbase/security/HBaseSaslRpcClient.java
Lines 192 to 295 in 202b17f
public void initCryptoCipher(RPCProtos.CryptoCipherMeta cryptoCipherMeta, | |
Configuration conf) throws IOException { | |
// create SaslAES for client | |
cryptoAES = EncryptionUtil.createCryptoAES(cryptoCipherMeta, conf); | |
cryptoAesEnable = true; | |
} | |
/** | |
* Get a SASL wrapped InputStream. Can be called only after saslConnect() has been called. | |
* @return a SASL wrapped InputStream | |
* @throws IOException | |
*/ | |
public InputStream getInputStream() throws IOException { | |
if (!saslClient.isComplete()) { | |
throw new IOException("Sasl authentication exchange hasn't completed yet"); | |
} | |
// If Crypto AES is enabled, return cryptoInputStream which unwrap the data with Crypto AES. | |
if (cryptoAesEnable && cryptoInputStream != null) { | |
return cryptoInputStream; | |
} | |
return saslInputStream; | |
} | |
class WrappedInputStream extends FilterInputStream { | |
private ByteBuffer unwrappedRpcBuffer = ByteBuffer.allocate(0); | |
public WrappedInputStream(InputStream in) throws IOException { | |
super(in); | |
} | |
@Override | |
public int read() throws IOException { | |
byte[] b = new byte[1]; | |
int n = read(b, 0, 1); | |
return (n != -1) ? b[0] : -1; | |
} | |
@Override | |
public int read(byte b[]) throws IOException { | |
return read(b, 0, b.length); | |
} | |
@Override | |
public synchronized int read(byte[] buf, int off, int len) throws IOException { | |
// fill the buffer with the next RPC message | |
if (unwrappedRpcBuffer.remaining() == 0) { | |
readNextRpcPacket(); | |
} | |
// satisfy as much of the request as possible | |
int readLen = Math.min(len, unwrappedRpcBuffer.remaining()); | |
unwrappedRpcBuffer.get(buf, off, readLen); | |
return readLen; | |
} | |
// unwrap messages with Crypto AES | |
private void readNextRpcPacket() throws IOException { | |
LOG.debug("reading next wrapped RPC packet"); | |
DataInputStream dis = new DataInputStream(in); | |
int rpcLen = dis.readInt(); | |
byte[] rpcBuf = new byte[rpcLen]; | |
dis.readFully(rpcBuf); | |
// unwrap with Crypto AES | |
rpcBuf = cryptoAES.unwrap(rpcBuf, 0, rpcBuf.length); | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("unwrapping token of length:" + rpcBuf.length); | |
} | |
unwrappedRpcBuffer = ByteBuffer.wrap(rpcBuf); | |
} | |
} | |
/** | |
* Get a SASL wrapped OutputStream. Can be called only after saslConnect() has been called. | |
* @return a SASL wrapped OutputStream | |
* @throws IOException | |
*/ | |
public OutputStream getOutputStream() throws IOException { | |
if (!saslClient.isComplete()) { | |
throw new IOException("Sasl authentication exchange hasn't completed yet"); | |
} | |
// If Crypto AES is enabled, return cryptoOutputStream which wrap the data with Crypto AES. | |
if (cryptoAesEnable && cryptoOutputStream != null) { | |
return cryptoOutputStream; | |
} | |
return saslOutputStream; | |
} | |
class WrappedOutputStream extends FilterOutputStream { | |
public WrappedOutputStream(OutputStream out) throws IOException { | |
super(out); | |
} | |
@Override | |
public void write(byte[] buf, int off, int len) throws IOException { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("wrapping token of length:" + len); | |
} | |
// wrap with Crypto AES | |
byte[] wrapped = cryptoAES.wrap(buf, off, len); | |
DataOutputStream dob = new DataOutputStream(out); | |
dob.writeInt(wrapped.length); | |
dob.write(wrapped, 0, wrapped.length); | |
dob.flush(); | |
} | |
} |
This is the commons-crypto logic I'm referring to. This wrapping and unwrapping will be encrypting the traffic after the SASL handshake is done.
Either way, let me try this out with setting the QOP.
But where is the negotiation part?
The negotiation is partially happening with the JDK itself and partially happening by the Callback implementation's you've written. SASL is just a series of challenge/response messages being passed between client and server.
|
||
@Override | ||
public String[] getMechanismNames(Map<String, ?> props) { | ||
return OAuthBearerSaslClient.mechanismNamesCompatibleWithPolicy(props); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code is being executed, it is because someone explicitly activated it via configuration. We should throw a RuntimeException if the configuration disallows us to use this implementation (e.g. if Sasl.POLICY_NOPLAINTEXT == true
). We should fail in this manner until we have RPC encryption tested/working.
} | ||
|
||
public static void initialize() { | ||
Security.addProvider(new OAuthBearerSaslClientProvider()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we actually need to register a provider with the JCA security provider framework because we don't actually use it. IIRC, Kafka relies on JAAS to automatically perform the logins for them, whereas HBase explicitly sets up the implementation to use (via the AuthenticationProviderSelector). I think we can drop this class entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't work without it, because we don't instantiate the client and the server directly, but asking the Sasl framework to create them for us. In the client auth provider we call Sasl.createSaslClient()
to create the client which doesn't work without the client registered for the mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the client auth provider we call Sasl.createSaslClient() to create the client which doesn't work without the client registered for the mechanism.
Ah, ok. I remember that when we were messing around with other custom Providers. Thanks for clarifying!
public SaslAuthMethod getSaslAuthMethod() { | ||
return SASL_AUTH_METHOD; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity, this SaslAuthMethod (HBase class) is how HBase's RPC classes are figuring out the correct client-side logic to use given a token (secret material).
if (privateCredentials.size() == 1) { | ||
LOG.debug("Found 1 OAuthBearer token"); | ||
callback.token(privateCredentials.iterator().next()); | ||
} else { | ||
/* | ||
* There a very small window of time upon token refresh (on the order of milliseconds) | ||
* where both an old and a new token appear on the Subject's private credentials. | ||
* Rather than implement a lock to eliminate this window, we will deal with it by | ||
* checking for the existence of multiple tokens and choosing the one that has the | ||
* longest lifetime. It is also possible that a bug could cause multiple tokens to | ||
* exist (e.g. KAFKA-7902), so dealing with the unlikely possibility that occurs | ||
* during normal operation also allows us to deal more robustly with potential bugs. | ||
*/ | ||
SortedSet<OAuthBearerToken> sortedByLifetime = | ||
new TreeSet<>( | ||
new Comparator<OAuthBearerToken>() { | ||
@Override | ||
public int compare(OAuthBearerToken o1, OAuthBearerToken o2) { | ||
return Long.compare(o1.lifetimeMs(), o2.lifetimeMs()); | ||
} | ||
}); | ||
sortedByLifetime.addAll(privateCredentials); | ||
if (LOG.isWarnEnabled()) { | ||
LOG.warn("Found {} OAuth Bearer tokens in Subject's private credentials; " + | ||
"the oldest expires at {}, will use the newest, which expires at {}", | ||
sortedByLifetime.size(), new Date(sortedByLifetime.first().lifetimeMs()), | ||
new Date(sortedByLifetime.last().lifetimeMs())); | ||
} | ||
callback.token(sortedByLifetime.last()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to move choosing the private credential to its own method and simplifying this to be:
callback.token(choosePrivateCredential(privateCredentials));
...va/org/apache/hadoop/hbase/security/oauthbearer/internals/OAuthBearerSaslServerProvider.java
Show resolved
Hide resolved
* This class has been copy-and-pasted from Kafka codebase. | ||
*/ | ||
@InterfaceAudience.Public | ||
public class OAuthBearerClientInitialResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of the regex-based parsing happening here. I think we could just replace this with some protobuf code which is what the rest of HBase RPCs use consistently.
If we are going the feature branch route, I'm happy to defer making that change until after we have some known-good test cases to start with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to do the refactoring, but will deliver it in a separate patch.
public byte[] evaluateResponse(byte[] response) | ||
throws SaslException, SaslAuthenticationException { | ||
try { | ||
if (response.length == 1 && response[0] == OAuthBearerSaslClient.BYTE_CONTROL_A && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using protobuf will also have the benefit of being able to strongly represent state (rather than relying on special bytes, like this is doing).
...va/org/apache/hadoop/hbase/security/oauthbearer/internals/knox/OAuthBearerSignedJwtTest.java
Show resolved
Hide resolved
hbase-client/src/main/java/org/apache/hadoop/hbase/security/AccessDeniedException.java
Outdated
Show resolved
Hide resolved
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with Andor and breaking out some tasks into HBASE-26553, I'm OK to merge this into the feature-branch to get some iteration. +1 to merge to the feature branch, assuming precommit looks happy.
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Could you fix up the two automated check outputs, Andor?
Then, I think we're good for the feature branch? |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
Fixing the javadoc warning as I merge this. I think javadoc is wrong in its warning, but I'm just removing the |
Merged into HBASE-26553 |
HBASE-26655 Initial commit with basic functionality and example code
Based on previous PR #3934