-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing authentication for Pulsar Functions #3735
Conversation
rerun java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiazhai since you are working on kerberos authentication, it would be good for you to take a look at the interface here, to make sure if people want to implement kerberos authentication using this interface, is it straightforward to implement one?
} | ||
|
||
message FunctionAuthenticationSpec { | ||
string data = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering why are you using string
, not bytes
. my concern of string
is that encoding with locales make things very tricky and hard to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know what kind of params/data would need to be stored so I just based this off of the authParams String format we already have in Pulsar. We can change this to bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also you can always base64 encode a binary to be stored as a string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng bytes is a more natural representation :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie i have changed
import org.apache.pulsar.functions.instance.AuthenticationConfig; | ||
import org.apache.pulsar.functions.proto.Function; | ||
|
||
public interface FunctionAuthProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add javadoc comments in the interface? otherwise other people will have difficulties on understanding how to implement an Auth provider.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
|
||
void configureAuthenticationConfig(AuthenticationConfig authConfig, Function.FunctionAuthenticationSpec functionAuthenticationSpec); | ||
|
||
Function.FunctionAuthenticationSpec cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid exposing protobuf structures in the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup we can have a wrapper for this
import org.apache.pulsar.functions.instance.AuthenticationConfig; | ||
import org.apache.pulsar.functions.proto.Function; | ||
|
||
public interface FunctionAuthProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a very high level question here - why do you need a separate AuthProvider? why can you use the AuthenticationProvider interface at the broker? what are the real differences between these two interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the AuthenticationProvider is interface to simply do authentication. The interface here for functions is a little bit more involved. It needs to have to ability to:
- We need to be able to cache some data in the function meta data topic. This could be the auth data itself or a pointer the auth data.
- Based on the auth data of the function, we need to be able to manipulate the function runtime somehow (depends on runtime)
This interface doesn't actually authenticate a user but facilitates the authentication process for functions. Whether the name of the interface is appropriate can be up for discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. A better name or a javadoc for this class is good to have for this class.
public Function.FunctionAuthenticationSpec cacheAuthData(String tenant, String namespace, String name, | ||
AuthenticationDataSource authenticationDataSource) | ||
throws Exception { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need return a null here? or shall returning a spec with a data
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite follow. We can return whatever we want there as long as it doesn't trigger anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean returning null usually results in NPE if you don't take care of it at the callers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use null to gate whether we should do anything or not:
https://github.com/apache/pulsar/pull/3735/files#diff-d906ae3d0366fbf1866c0334f075b32eR394
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API perspective, it's better to use Optional<X>
whenever the returned object might not be applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok we can change this method to return Optional<Function.FunctionAuthenticationSpe>. Is that what you would recommend?
try { | ||
setupClient(); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add a logging statement before throwing a RuntimeException?
RuntimeException is very hard to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add
@Override | ||
public void close() { | ||
// cancel liveness checker before terminating runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the change code here is related to Authentication. If there are multiple changes coupled in this pull request, can we try to separate them into multiple different pull requests in future? or at least highlight them at the description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a change that was required in a previous version of the implementation. I don't think its required anymore
try { | ||
runtime.terminate(); | ||
} catch (Exception e) { | ||
log.warn("Failed to terminate function runtime: {}", e, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are logging two e
.
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details); | ||
log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId()); | ||
|
||
if (functionRuntimeInfo.getRuntimeSpawner() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that the logic here are not related.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is relevant since in terminate we have to call cleanUpAuthData before we set the runtimespawner to be null
.run(); | ||
|
||
if (!success.get()) { | ||
throw new RuntimeException(String.format("Failed to create service account %s for function %s", serviceAccountName, fqfn)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging before throwing runtime exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be logged else where
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will be logged else where
sorry, at where? if you don't catch RuntimeException, isn't the context lost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its caught and logged here:
https://github.com/apache/pulsar/pull/3735/files#diff-c2c5e0719db635b36beb4cd6d816abdeL226
rerun java8 tests |
rerun java8 tests |
@jiazhai can you review this please? Thanks. |
rerun java8 tests |
rerun java8 tests |
@merlimat @sijie @srkukarni @jiazhai please review. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some unit testing. I know it's hard for k8s, but at least for plaintext, it would be good to seem something exercising the interfaces.
The change seems to assume token auth in a couple of places. This should be explicitly checked, as in, it shouldn't even try to distribute the auth data if anything other than token is used for now.
@@ -35,7 +35,7 @@ | |||
public class AuthenticationProviderToken implements AuthenticationProvider { | |||
|
|||
final static String HTTP_HEADER_NAME = "Authorization"; | |||
final static String HTTP_HEADER_VALUE_PREFIX = "Bearer "; | |||
public final static String HTTP_HEADER_VALUE_PREFIX = "Bearer "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than making this public, you should make getToken(AuthenticationDataSource) static and call it from the other locations.
} | ||
|
||
message FunctionAuthenticationSpec { | ||
bytes data = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also include method, which the function runner can use to figure out which function auth provider to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function auth provider should be a cluster wide setting and not on a per function basis right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comment to clarify the content and usage of "data" field, to summarize the below discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat i have add comments
/** | ||
* Kubernetes runtime specific functions authentication provider | ||
*/ | ||
public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bad sign if for the first serious use of an abstraction you need to create another abstraction to work work around shortcomings in the first abstraction. However, I don't think it's even needed in this case.
The functionAuthData returned by cacheAuthData should contain the actual token. The caller of cacheAuthData can then create a secret, and then mount that secret on each of the instance pods. When the instance runs, it can always check for this secret and if it is mounted, the data can be read in and passed to configureAuthenticationConfig.
I don't see why service accounts are needed at all here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bad sign if for the first serious use of an abstraction you need to create another abstraction to work work around shortcomings in the first abstraction. However, I don't think it's even needed in this case.
Why is that bad? I designed this so that it is cleaner compared to AuthenticationDataSource where you have a mixture of interfaces to support different authentication methods. I imagine in the future, different runtimes will have different requirements / interfaces needed to support authentication. It doesn't make sense to clutter them all together.
The functionAuthData returned by cacheAuthData should contain the actual token.
Why should functionAuthData contain the actual token? This is implementation specific data. Different implementations of FunctionAuthenticationProvider should have the flexibility to use it in a way that makes sense for the implementation.
The caller of cacheAuthData can then create a secret, and then mount that secret on each of the instance pods.
I think there is some misunderstanding here. The architecture of Pulsar Function decouples submitting functions and running functions. The worker that the user actually submits the function do is not necessarily going to be the same worker that is going to run the function. However, the auth data will only be passed to the worker that the user at first submits his or her function to. That is why that worker needs to be able to distribute that auth data or data based on that auth data to the rest of the workers that will potentially need to run a function instance. The interface cacheAuthData returns the data that needs to be distributed to the other workers via the function metadata topic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is some misunderstanding here.
Ya, I needed to map out the interactions. See my top level comment.
@ivankelly I don't think I am assuming a token based authentication. The current implementation of the interfaces only accept token based authentication. Do you have a specific location you can point me to that only will work with token based auth? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments on the new interfaces.
Also, can you add a description of the workflow for the management of credentials when the functions are submitted or scheduled?
} | ||
|
||
message FunctionAuthenticationSpec { | ||
bytes data = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does data
includes the credential? Or a pointer to the credentials? Can you add comment to clarify the field meaning?
If it includes credentials, should this be included in the function metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it can include the actual credentials (not secure) or a pointer to the credentials. This all depends on the underlying implementation of the interface. I left this field open ended so that there can be flexibility of what can be stored in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should always include the credentials. If you want to make it secure, make it secure using the caller of cacheAuthData, not within cacheAuthData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivankelly I don't quite follow what you are proposing. This protobuf message will be stored in the function metadata topic. This message was designed with a generic purpose to store some data for the function authentication provider so that it can distribute it to all the workers. What that data is is up to the function authentication provider implementation. This gives us flexibility in the types of authentication we can support in the future.
* @return | ||
* @throws Exception | ||
*/ | ||
FunctionAuthData cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of caching auth data? If there are credentials being submitted, we should immediately store them in the secrets backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a mechanism to distribute auth credentials or pointers to auth credentials based on implementtion to workers that need to run an instance of the function. Since in the function architecture, submitting the function and running the function are decoupled and might not happen on the same machine we need a mechanism to distribute some information to workers about how to configure authentication for individual function instances
public Function.FunctionAuthenticationSpec cacheAuthData(String tenant, String namespace, String name, | ||
AuthenticationDataSource authenticationDataSource) | ||
throws Exception { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API perspective, it's better to use Optional<X>
whenever the returned object might not be applicable.
@ivankelly I have also added some unit tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
I have only one comment around functions depending on broker module, this might potentially create some weird recurring dependency when function worker is running as part of broker in future.
I would suggest @jiazhai taking a closer look at the interface since he is working on kerberos authentication to make sure it is easy to add kerberos to functions in future.
|
||
import java.util.Optional; | ||
|
||
import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it is a good idea to reference a method in broker module. this would produce potentially very bad dependency tree. If it is common enough, it should be in pulsar-common, otherwise I would suggest just redoing the logic in functions module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya @merlimat and I discussed this as well. We could just move the get token logic to some utils class in pulsar-common. Originally I actually was just redoing the logic in the functions module, but @ivankelly suggested to reuse the code in the AuthenticationProviderToken class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya, break it into common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivankelly its kind of messy extracting the getToken method from AuthenticationProviderToken since it relies on the on many package local String declared in the class.
I think the cleanest way is just duplicate some of the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would also be a big move since AuthenticationDataSource would also need to move the pulsar-common and all the things that AuthenticationDataSource depends on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie I think pulsar-function-runtime is going to have to depend on pulsar-broker-common for the time being since AuthenticationDataSource interface in part of pulsar-broker-common and pulsar-function-worker already depends on pulsar-broker-common anyways
rerun java8 tests |
@jerrypeng So, in the sequence you have, the service account bit is entirely unnecessary. Rather than specifying a service account for the StatefulSet(?), you can just attach the secret as a volumeMount or as a env variable. |
|
||
// adjust the auth config to support auth | ||
if (instanceConfig.getFunctionAuthenticationSpec() != null) { | ||
getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what this call is for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call is to configure the authConfig appropriately to support auth because it will get passed into KubernetesRuntime and eventually in the the function instance. And the fields in the authConfig is what the function instance or the pulsar client in the instance will use for authentication
* @param authConfig authentication configs passed to the function instance | ||
* @param functionAuthData function authentication data that is provider specific | ||
*/ | ||
void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
configureAuthenticationConfig sounds weird, why not just configureAuth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure I can do that but the reason I named the method configureAuthenticationConfig
is because we are literally configuring AuthenticationConfig
based on FunctionAuthData
* A wrapper for authentication data for functions | ||
*/ | ||
public class FunctionAuthData { | ||
private byte[] data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a string member here to hold which functions auth provider created the auth data. this can be used by the k8s runtime provider at least validate the auth data is of the type expected. I know this will be configured at a cluster level, but people screw up configuration all the time. Better to be defensive.
@ivankelly responding to your comments
let me experiment with not having a service account and see if that works
I don't believe that is the correct design choice because now the runtime is making assumptions on what the data in FunctionAuthData is. The current implementation |
What you are doing now in the case of KubernetesFunctionAuthProvider is premature generalization. We only have one use case for it, and that's very clear in what methods are in the interface. Something for vault with dynamic tokens (i think i figured out the workload for this btw) would need something very different. Generalization for this interface doesn't make sense even. The AuthProvider interface is a way for the runtime to allow common rest code code to pass information to the common instance code. But between these two points, everything is k8s specific. So we should treat it as such.
We can make this assumption for all k8s based auth providers, because secrets is the way to pass around sensitive information in k8s. |
@ivankelly you are correct service accounts are not needed to mount secrets in pods. I have remove the service account related code |
rerun java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
message FunctionAuthenticationSpec { | ||
bytes data = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comment to clarify the content and usage of "data" field, to summarize the below discussion.
"--path", | ||
bkPath, | ||
"--destination-file", | ||
userCodeFilePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not for this PR) instead of passing all these as CLI options, we should instead place these into client.conf
which is already automatically picked up by pulsar-admin
(and potentially we can reuse also for functions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
rerun java8 tests |
rerun integration tests |
run integration tests |
Motivation
Currently authentication for functions can only work with static credentials which makes every function have the same credentials as any other function which is not practical. Since functions should have the same credentials as the users that submit them.
This is an initial implementation that solves the issue
Currently, users will not be able to specify which function auth provider to use, but I can add that in a subsequent PR once everyone is onboard with this
Master issue: #3763
The workflow of the current API is as such:
Below is a diagram that demonstrates the end-to-end process of function authentication for a possible implementation on function authentication in Kubernetes using tokens issued by Hashicorp's Vault:
The KubernetesFunctionAuthenticationProvider implementation is based on the above diagram