Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update eager: make sure client secret can be specified as env var #2720

Merged
merged 14 commits into from
Sep 5, 2024
43 changes: 36 additions & 7 deletions flytekit/experimental/eager_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import inspect
import os
import signal
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -382,6 +383,7 @@ def eager(
timeout: Optional[timedelta] = None,
poll_interval: Optional[timedelta] = None,
local_entrypoint: bool = False,
client_secret_env_var: Optional[str] = None,
**kwargs,
):
"""Eager workflow decorator.
Expand All @@ -396,6 +398,8 @@ def eager(
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local
testing against a remote Flyte cluster.
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay with this for now, but I think we should see if we can't move this to some higher level in the future. the secret here we're talking about is the secret for hitting admin itself right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be implemented in Secret but we haven't moved on that for a while: #1726

remote authentication.
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`.

This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be
Expand Down Expand Up @@ -488,7 +492,10 @@ async def eager_workflow(x: int) -> int:
remote=remote,
client_secret_group=client_secret_group,
client_secret_key=client_secret_key,
timeout=timeout,
poll_interval=poll_interval,
local_entrypoint=local_entrypoint,
client_secret_env_var=client_secret_env_var,
**kwargs,
)

Expand All @@ -510,7 +517,9 @@ async def wrapper(*args, **kws):
execution_id = exec_params.execution_id

async_stack = AsyncStack(task_id, execution_id)
_remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint)
_remote = _prepare_remote(
_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var
)

# make sure sub-nodes as cleaned up on termination signal
loop = asyncio.get_event_loop()
Expand All @@ -533,8 +542,10 @@ async def wrapper(*args, **kws):
await cleanup_fn()

secret_requests = kwargs.pop("secret_requests", None) or []
if client_secret_group is not None and client_secret_key is not None:
try:
secret_requests.append(Secret(group=client_secret_group, key=client_secret_key))
except ValueError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there's nothing in the client_secret_env_var environment variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_secret_env_var doesn't figure into this try except block, or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no yeah i know, i mean, what happens if all three (client_secret_env_var, client_secret_group and client_secret_key) are None?


return task(
wrapper,
Expand All @@ -551,6 +562,7 @@ def _prepare_remote(
client_secret_group: Optional[str] = None,
client_secret_key: Optional[str] = None,
local_entrypoint: bool = False,
client_secret_env_var: Optional[str] = None,
) -> Optional[FlyteRemote]:
"""Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster."""

Expand All @@ -576,7 +588,7 @@ def _prepare_remote(
if remote.config.platform.endpoint.startswith("localhost"):
# replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster
return _internal_demo_remote(remote)
return _internal_remote(remote, client_secret_group, client_secret_key)
return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var)


def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote:
Expand Down Expand Up @@ -605,16 +617,33 @@ def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote:

def _internal_remote(
remote: FlyteRemote,
client_secret_group: str,
client_secret_key: str,
client_secret_group: Optional[str],
client_secret_key: Optional[str],
client_secret_env_var: Optional[str],
) -> FlyteRemote:
"""Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally."""
assert client_secret_group is not None, "secret_group must be defined when using a remote cluster"
assert client_secret_key is not None, "secret_key must be defined a remote cluster"
secrets_manager = current_context().secrets

assert (
client_secret_group is not None or client_secret_key is not None
), "One of client_secret_group or client_secret_key must be defined when using a remote cluster"

client_secret = secrets_manager.get(client_secret_group, client_secret_key)
# get the raw output prefix from the context that's set from the pyflyte-execute entrypoint
# (see flytekit/bin/entrypoint.py)

if client_secret_env_var is not None:
# this creates a remote client where the env var client secret is sufficient for authentication
os.environ[client_secret_env_var] = client_secret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confused what's happening here. if we're removing the asserts, then that means the group/key might be None. If they're none, won't the secrets_manager.get fail? So then what is the client_secret? Or is there some default in the secrets manager that somehow gets returned?
I don't get why we're setting the environment variable here to the value of client secret.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to make it easy to authenticate with UnionRemote on serverless (via UNION_SERVERLESS_API_KEY env var). Secrets on serverless only needs a secret key (no group).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering, in the following scenario:

  • client_secret_key is None
  • client_secret_group is None
    (and this scenario is now valid because the asserts are now removed)
    after line 626 client_secret = secrets_manager.get(client_secret_group, client_secret_key)
    what is the value of client_secret?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going through the secretsmanager, it looks like it raises an IndexError if you call get(group=None, key=None), so I don't think it ever gets past line 626. Could we move the assertion that at least one of those has be be non-None into the get function?

try:
remote_cls = type(remote)
return remote_cls(
default_domain=remote.default_domain,
default_project=remote.default_project,
)
except Exception as exc:
raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc

ctx = FlyteContextManager.current_context()
return FlyteRemote(
config=remote.config.with_params(
Expand Down
Loading