-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update eager: make sure client secret can be specified as env var #2720
Changes from 11 commits
9b65742
4e04b82
5abc9ad
9b9dd83
f8f97b5
54c4471
b526f6f
66c6bd4
9ec46fb
d21559f
aec338a
f0b5d67
351a8c6
2425ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import asyncio | ||
import inspect | ||
import os | ||
import signal | ||
from contextlib import asynccontextmanager | ||
from datetime import datetime, timedelta, timezone | ||
|
@@ -382,6 +383,7 @@ | |
timeout: Optional[timedelta] = None, | ||
poll_interval: Optional[timedelta] = None, | ||
local_entrypoint: bool = False, | ||
client_secret_env_var: Optional[str] = None, | ||
**kwargs, | ||
): | ||
"""Eager workflow decorator. | ||
|
@@ -396,6 +398,8 @@ | |
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided | ||
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local | ||
testing against a remote Flyte cluster. | ||
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay with this for now, but I think we should see if we can't move this to some higher level in the future. the secret here we're talking about is the secret for hitting admin itself right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be implemented in |
||
remote authentication. | ||
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`. | ||
|
||
This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be | ||
|
@@ -488,7 +492,10 @@ | |
remote=remote, | ||
client_secret_group=client_secret_group, | ||
client_secret_key=client_secret_key, | ||
timeout=timeout, | ||
poll_interval=poll_interval, | ||
local_entrypoint=local_entrypoint, | ||
client_secret_env_var=client_secret_env_var, | ||
**kwargs, | ||
) | ||
|
||
|
@@ -510,7 +517,9 @@ | |
execution_id = exec_params.execution_id | ||
|
||
async_stack = AsyncStack(task_id, execution_id) | ||
_remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint) | ||
_remote = _prepare_remote( | ||
_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var | ||
) | ||
|
||
# make sure sub-nodes as cleaned up on termination signal | ||
loop = asyncio.get_event_loop() | ||
|
@@ -533,8 +542,10 @@ | |
await cleanup_fn() | ||
|
||
secret_requests = kwargs.pop("secret_requests", None) or [] | ||
if client_secret_group is not None and client_secret_key is not None: | ||
try: | ||
secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) | ||
except ValueError: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if there's nothing in the client_secret_env_var environment variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no yeah i know, i mean, what happens if all three ( |
||
|
||
return task( | ||
wrapper, | ||
|
@@ -551,6 +562,7 @@ | |
client_secret_group: Optional[str] = None, | ||
client_secret_key: Optional[str] = None, | ||
local_entrypoint: bool = False, | ||
client_secret_env_var: Optional[str] = None, | ||
) -> Optional[FlyteRemote]: | ||
"""Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster.""" | ||
|
||
|
@@ -576,7 +588,7 @@ | |
if remote.config.platform.endpoint.startswith("localhost"): | ||
# replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster | ||
return _internal_demo_remote(remote) | ||
return _internal_remote(remote, client_secret_group, client_secret_key) | ||
return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var) | ||
|
||
|
||
def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: | ||
|
@@ -605,16 +617,25 @@ | |
|
||
def _internal_remote( | ||
remote: FlyteRemote, | ||
client_secret_group: str, | ||
client_secret_key: str, | ||
client_secret_group: Optional[str], | ||
client_secret_key: Optional[str], | ||
client_secret_env_var: Optional[str], | ||
) -> FlyteRemote: | ||
"""Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" | ||
assert client_secret_group is not None, "secret_group must be defined when using a remote cluster" | ||
assert client_secret_key is not None, "secret_key must be defined a remote cluster" | ||
secrets_manager = current_context().secrets | ||
client_secret = secrets_manager.get(client_secret_group, client_secret_key) | ||
# get the raw output prefix from the context that's set from the pyflyte-execute entrypoint | ||
# (see flytekit/bin/entrypoint.py) | ||
|
||
if client_secret_env_var is not None: | ||
# this creates a remote client where the env var client secret is sufficient for authentication | ||
os.environ[client_secret_env_var] = client_secret | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. confused what's happening here. if we're removing the asserts, then that means the group/key might be None. If they're none, won't the secrets_manager.get fail? So then what is the client_secret? Or is there some default in the secrets manager that somehow gets returned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is to make it easy to authenticate with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just wondering, in the following scenario:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. going through the secretsmanager, it looks like it raises an IndexError if you call |
||
try: | ||
remote_cls = type(remote) | ||
return remote_cls() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assumes that the other Also, should this carry over the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ooo yeah it should probably carry over those args. admittedly this code path is basically: "tell me you only run on serverless without saying you run on serverless" 🙃 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically |
||
except Exception as exc: | ||
raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc | ||
|
||
ctx = FlyteContextManager.current_context() | ||
return FlyteRemote( | ||
config=remote.config.with_params( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this issue comes up not just with eager. Frameworks like Langchain that have async functionality will cause the event loop to end before the flyte task can run the async data persistence operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a separate event loop just for flytekit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds right, I think someone more well-versed in async can help with that