From 9f9f1976716e1259e456631956d66960eefd6cf4 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Thu, 5 Sep 2024 18:19:20 -0400 Subject: [PATCH] Update eager: make sure client secret can be specified as env var (#2720) * fix eager mode Signed-off-by: Niels Bantilan * bind secret to env var Signed-off-by: Niels Bantilan * add remote creation error handling Signed-off-by: Niels Bantilan * update new arg to client_secret_env_var Signed-off-by: Niels Bantilan * fix lint Signed-off-by: Niels Bantilan * fix bug Signed-off-by: Niels Bantilan * fix kwargs Signed-off-by: Niels Bantilan * try creating secret Signed-off-by: Niels Bantilan * add event loop if needed Signed-off-by: Niels Bantilan * debug Signed-off-by: Niels Bantilan * debug Signed-off-by: Niels Bantilan * update error Signed-off-by: Niels Bantilan * pass default domain and project to new remote Signed-off-by: Niels Bantilan --------- Signed-off-by: Niels Bantilan --- flytekit/experimental/eager_function.py | 43 +++++++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/flytekit/experimental/eager_function.py b/flytekit/experimental/eager_function.py index 7eec791726..f5c0051de2 100644 --- a/flytekit/experimental/eager_function.py +++ b/flytekit/experimental/eager_function.py @@ -1,5 +1,6 @@ import asyncio import inspect +import os import signal from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone @@ -382,6 +383,7 @@ def eager( timeout: Optional[timedelta] = None, poll_interval: Optional[timedelta] = None, local_entrypoint: bool = False, + client_secret_env_var: Optional[str] = None, **kwargs, ): """Eager workflow decorator. @@ -396,6 +398,8 @@ def eager( :param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided :py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local testing against a remote Flyte cluster. + :param client_secret_env_var: if specified, binds the client secret to the specified environment variable for + remote authentication. :param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`. This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be @@ -488,7 +492,10 @@ async def eager_workflow(x: int) -> int: remote=remote, client_secret_group=client_secret_group, client_secret_key=client_secret_key, + timeout=timeout, + poll_interval=poll_interval, local_entrypoint=local_entrypoint, + client_secret_env_var=client_secret_env_var, **kwargs, ) @@ -510,7 +517,9 @@ async def wrapper(*args, **kws): execution_id = exec_params.execution_id async_stack = AsyncStack(task_id, execution_id) - _remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint) + _remote = _prepare_remote( + _remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var + ) # make sure sub-nodes as cleaned up on termination signal loop = asyncio.get_event_loop() @@ -533,8 +542,10 @@ async def wrapper(*args, **kws): await cleanup_fn() secret_requests = kwargs.pop("secret_requests", None) or [] - if client_secret_group is not None and client_secret_key is not None: + try: secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) + except ValueError: + pass return task( wrapper, @@ -551,6 +562,7 @@ def _prepare_remote( client_secret_group: Optional[str] = None, client_secret_key: Optional[str] = None, local_entrypoint: bool = False, + client_secret_env_var: Optional[str] = None, ) -> Optional[FlyteRemote]: """Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster.""" @@ -576,7 +588,7 @@ def _prepare_remote( if remote.config.platform.endpoint.startswith("localhost"): # replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster return _internal_demo_remote(remote) - return _internal_remote(remote, client_secret_group, client_secret_key) + return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var) def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: @@ -605,16 +617,33 @@ def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: def _internal_remote( remote: FlyteRemote, - client_secret_group: str, - client_secret_key: str, + client_secret_group: Optional[str], + client_secret_key: Optional[str], + client_secret_env_var: Optional[str], ) -> FlyteRemote: """Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" - assert client_secret_group is not None, "secret_group must be defined when using a remote cluster" - assert client_secret_key is not None, "secret_key must be defined a remote cluster" secrets_manager = current_context().secrets + + assert ( + client_secret_group is not None or client_secret_key is not None + ), "One of client_secret_group or client_secret_key must be defined when using a remote cluster" + client_secret = secrets_manager.get(client_secret_group, client_secret_key) # get the raw output prefix from the context that's set from the pyflyte-execute entrypoint # (see flytekit/bin/entrypoint.py) + + if client_secret_env_var is not None: + # this creates a remote client where the env var client secret is sufficient for authentication + os.environ[client_secret_env_var] = client_secret + try: + remote_cls = type(remote) + return remote_cls( + default_domain=remote.default_domain, + default_project=remote.default_project, + ) + except Exception as exc: + raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc + ctx = FlyteContextManager.current_context() return FlyteRemote( config=remote.config.with_params(