Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update eager: make sure client secret can be specified as env var #2720

Merged
merged 14 commits into from
Sep 5, 2024
7 changes: 7 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ def _dispatch_execute(
for k, v in output_file_dict.items():
utils.write_proto_to_file(v.to_flyte_idl(), os.path.join(ctx.execution_state.engine_dir, k))

# make sure an event loop exists for data persistence step
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this issue comes up not just with eager. Frameworks like Langchain that have async functionality will cause the event loop to end before the flyte task can run the async data persistence operation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a separate event loop just for flytekit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds right, I think someone more well-versed in async can help with that

try:
asyncio.get_event_loop()
except Exception:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
cosmicBboy marked this conversation as resolved.
Show resolved Hide resolved

ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")

Expand Down
35 changes: 28 additions & 7 deletions flytekit/experimental/eager_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import inspect
import os
import signal
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -382,6 +383,7 @@
timeout: Optional[timedelta] = None,
poll_interval: Optional[timedelta] = None,
local_entrypoint: bool = False,
client_secret_env_var: Optional[str] = None,
**kwargs,
):
"""Eager workflow decorator.
Expand All @@ -396,6 +398,8 @@
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local
testing against a remote Flyte cluster.
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay with this for now, but I think we should see if we can't move this to some higher level in the future. the secret here we're talking about is the secret for hitting admin itself right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be implemented in Secret but we haven't moved on that for a while: #1726

remote authentication.
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`.

This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be
Expand Down Expand Up @@ -488,7 +492,10 @@
remote=remote,
client_secret_group=client_secret_group,
client_secret_key=client_secret_key,
timeout=timeout,
poll_interval=poll_interval,
local_entrypoint=local_entrypoint,
client_secret_env_var=client_secret_env_var,
**kwargs,
)

Expand All @@ -510,7 +517,9 @@
execution_id = exec_params.execution_id

async_stack = AsyncStack(task_id, execution_id)
_remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint)
_remote = _prepare_remote(

Check warning on line 520 in flytekit/experimental/eager_function.py

View check run for this annotation

Codecov / codecov/patch

flytekit/experimental/eager_function.py#L520

Added line #L520 was not covered by tests
_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var
)

# make sure sub-nodes as cleaned up on termination signal
loop = asyncio.get_event_loop()
Expand All @@ -533,8 +542,10 @@
await cleanup_fn()

secret_requests = kwargs.pop("secret_requests", None) or []
if client_secret_group is not None and client_secret_key is not None:
try:
secret_requests.append(Secret(group=client_secret_group, key=client_secret_key))
except ValueError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there's nothing in the client_secret_env_var environment variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_secret_env_var doesn't figure into this try except block, or am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no yeah i know, i mean, what happens if all three (client_secret_env_var, client_secret_group and client_secret_key) are None?


return task(
wrapper,
Expand All @@ -551,6 +562,7 @@
client_secret_group: Optional[str] = None,
client_secret_key: Optional[str] = None,
local_entrypoint: bool = False,
client_secret_env_var: Optional[str] = None,
) -> Optional[FlyteRemote]:
"""Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster."""

Expand All @@ -576,7 +588,7 @@
if remote.config.platform.endpoint.startswith("localhost"):
# replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster
return _internal_demo_remote(remote)
return _internal_remote(remote, client_secret_group, client_secret_key)
return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var)

Check warning on line 591 in flytekit/experimental/eager_function.py

View check run for this annotation

Codecov / codecov/patch

flytekit/experimental/eager_function.py#L591

Added line #L591 was not covered by tests


def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote:
Expand Down Expand Up @@ -605,16 +617,25 @@

def _internal_remote(
remote: FlyteRemote,
client_secret_group: str,
client_secret_key: str,
client_secret_group: Optional[str],
client_secret_key: Optional[str],
client_secret_env_var: Optional[str],
) -> FlyteRemote:
"""Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally."""
assert client_secret_group is not None, "secret_group must be defined when using a remote cluster"
assert client_secret_key is not None, "secret_key must be defined a remote cluster"
secrets_manager = current_context().secrets
client_secret = secrets_manager.get(client_secret_group, client_secret_key)
# get the raw output prefix from the context that's set from the pyflyte-execute entrypoint
# (see flytekit/bin/entrypoint.py)

if client_secret_env_var is not None:
# this creates a remote client where the env var client secret is sufficient for authentication
os.environ[client_secret_env_var] = client_secret
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confused what's happening here. if we're removing the asserts, then that means the group/key might be None. If they're none, won't the secrets_manager.get fail? So then what is the client_secret? Or is there some default in the secrets manager that somehow gets returned?
I don't get why we're setting the environment variable here to the value of client secret.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to make it easy to authenticate with UnionRemote on serverless (via UNION_SERVERLESS_API_KEY env var). Secrets on serverless only needs a secret key (no group).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering, in the following scenario:

  • client_secret_key is None
  • client_secret_group is None
    (and this scenario is now valid because the asserts are now removed)
    after line 626 client_secret = secrets_manager.get(client_secret_group, client_secret_key)
    what is the value of client_secret?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going through the secretsmanager, it looks like it raises an IndexError if you call get(group=None, key=None), so I don't think it ever gets past line 626. Could we move the assertion that at least one of those has be be non-None into the get function?

try:
remote_cls = type(remote)
return remote_cls()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the other FlyteRemote object can be created without a Config and uses an environment variable. Is this okay for OSS?

Also, should this carry over the default_domain and default_project from the original remote obj? Or should the other remote object figure out the domain and project from the execution environment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooo yeah it should probably carry over those args.

admittedly this code path is basically: "tell me you only run on serverless without saying you run on serverless" 🙃

Copy link
Contributor Author

@cosmicBboy cosmicBboy Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically client_secret_env_var only really applies to serverless (this will fail with Flyte), unless at some point down the road Flyte provides an single API key UX.

except Exception as exc:
raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc

Check warning on line 637 in flytekit/experimental/eager_function.py

View check run for this annotation

Codecov / codecov/patch

flytekit/experimental/eager_function.py#L632-L637

Added lines #L632 - L637 were not covered by tests

ctx = FlyteContextManager.current_context()
return FlyteRemote(
config=remote.config.with_params(
Expand Down
Loading