-
Notifications
You must be signed in to change notification settings - Fork 297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update eager: make sure client secret can be specified as env var #2720
Changes from all commits
9b65742
4e04b82
5abc9ad
9b9dd83
f8f97b5
54c4471
b526f6f
66c6bd4
9ec46fb
d21559f
aec338a
f0b5d67
351a8c6
2425ed4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import asyncio | ||
import inspect | ||
import os | ||
import signal | ||
from contextlib import asynccontextmanager | ||
from datetime import datetime, timedelta, timezone | ||
|
@@ -382,6 +383,7 @@ def eager( | |
timeout: Optional[timedelta] = None, | ||
poll_interval: Optional[timedelta] = None, | ||
local_entrypoint: bool = False, | ||
client_secret_env_var: Optional[str] = None, | ||
**kwargs, | ||
): | ||
"""Eager workflow decorator. | ||
|
@@ -396,6 +398,8 @@ def eager( | |
:param local_entrypoint: If True, the eager workflow will can be executed locally but use the provided | ||
:py:func:`~flytekit.remote.FlyteRemote` object to create task/workflow executions. This is useful for local | ||
testing against a remote Flyte cluster. | ||
:param client_secret_env_var: if specified, binds the client secret to the specified environment variable for | ||
remote authentication. | ||
:param kwargs: keyword-arguments forwarded to :py:func:`~flytekit.task`. | ||
|
||
This type of workflow will execute all flyte entities within it eagerly, meaning that all python constructs can be | ||
|
@@ -488,7 +492,10 @@ async def eager_workflow(x: int) -> int: | |
remote=remote, | ||
client_secret_group=client_secret_group, | ||
client_secret_key=client_secret_key, | ||
timeout=timeout, | ||
poll_interval=poll_interval, | ||
local_entrypoint=local_entrypoint, | ||
client_secret_env_var=client_secret_env_var, | ||
**kwargs, | ||
) | ||
|
||
|
@@ -510,7 +517,9 @@ async def wrapper(*args, **kws): | |
execution_id = exec_params.execution_id | ||
|
||
async_stack = AsyncStack(task_id, execution_id) | ||
_remote = _prepare_remote(_remote, ctx, client_secret_group, client_secret_key, local_entrypoint) | ||
_remote = _prepare_remote( | ||
_remote, ctx, client_secret_group, client_secret_key, local_entrypoint, client_secret_env_var | ||
) | ||
|
||
# make sure sub-nodes as cleaned up on termination signal | ||
loop = asyncio.get_event_loop() | ||
|
@@ -533,8 +542,10 @@ async def wrapper(*args, **kws): | |
await cleanup_fn() | ||
|
||
secret_requests = kwargs.pop("secret_requests", None) or [] | ||
if client_secret_group is not None and client_secret_key is not None: | ||
try: | ||
secret_requests.append(Secret(group=client_secret_group, key=client_secret_key)) | ||
except ValueError: | ||
pass | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if there's nothing in the client_secret_env_var environment variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no yeah i know, i mean, what happens if all three ( |
||
|
||
return task( | ||
wrapper, | ||
|
@@ -551,6 +562,7 @@ def _prepare_remote( | |
client_secret_group: Optional[str] = None, | ||
client_secret_key: Optional[str] = None, | ||
local_entrypoint: bool = False, | ||
client_secret_env_var: Optional[str] = None, | ||
) -> Optional[FlyteRemote]: | ||
"""Prepare FlyteRemote object for accessing Flyte cluster in a task running on the same cluster.""" | ||
|
||
|
@@ -576,7 +588,7 @@ def _prepare_remote( | |
if remote.config.platform.endpoint.startswith("localhost"): | ||
# replace sandbox endpoints with internal dns, since localhost won't exist within the Flyte cluster | ||
return _internal_demo_remote(remote) | ||
return _internal_remote(remote, client_secret_group, client_secret_key) | ||
return _internal_remote(remote, client_secret_group, client_secret_key, client_secret_env_var) | ||
|
||
|
||
def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: | ||
|
@@ -605,16 +617,33 @@ def _internal_demo_remote(remote: FlyteRemote) -> FlyteRemote: | |
|
||
def _internal_remote( | ||
remote: FlyteRemote, | ||
client_secret_group: str, | ||
client_secret_key: str, | ||
client_secret_group: Optional[str], | ||
client_secret_key: Optional[str], | ||
client_secret_env_var: Optional[str], | ||
) -> FlyteRemote: | ||
"""Derives a FlyteRemote object from a yaml configuration file, modifying parts to make it work internally.""" | ||
assert client_secret_group is not None, "secret_group must be defined when using a remote cluster" | ||
assert client_secret_key is not None, "secret_key must be defined a remote cluster" | ||
secrets_manager = current_context().secrets | ||
|
||
assert ( | ||
client_secret_group is not None or client_secret_key is not None | ||
), "One of client_secret_group or client_secret_key must be defined when using a remote cluster" | ||
|
||
client_secret = secrets_manager.get(client_secret_group, client_secret_key) | ||
# get the raw output prefix from the context that's set from the pyflyte-execute entrypoint | ||
# (see flytekit/bin/entrypoint.py) | ||
|
||
if client_secret_env_var is not None: | ||
# this creates a remote client where the env var client secret is sufficient for authentication | ||
os.environ[client_secret_env_var] = client_secret | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. confused what's happening here. if we're removing the asserts, then that means the group/key might be None. If they're none, won't the secrets_manager.get fail? So then what is the client_secret? Or is there some default in the secrets manager that somehow gets returned? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is to make it easy to authenticate with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just wondering, in the following scenario:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. going through the secretsmanager, it looks like it raises an IndexError if you call |
||
try: | ||
remote_cls = type(remote) | ||
return remote_cls( | ||
default_domain=remote.default_domain, | ||
default_project=remote.default_project, | ||
) | ||
except Exception as exc: | ||
raise TypeError(f"Unable to authenticate remote class {remote_cls} with client secret") from exc | ||
|
||
ctx = FlyteContextManager.current_context() | ||
return FlyteRemote( | ||
config=remote.config.with_params( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay with this for now, but I think we should see if we can't move this to some higher level in the future. the secret here we're talking about is the secret for hitting admin itself right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be implemented in
Secret
but we haven't moved on that for a while: #1726