Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions src/flyte/storage/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,16 @@ class S3(Storage):
credential provider so profile-based auth can be used. This requires that the `boto3` library
is installed.

3. If neither of the above applies, obstore uses the default AWS credential chain
(for remote runs this commonly resolves via workload identity / IAM attached to
the service account and then IMDS fallbacks where applicable).
3. If `AWS_WEB_IDENTITY_TOKEN_FILE` is set (EKS / IRSA), Flyte configures a
boto3-backed obstore credential provider so botocore's standard chain
handles AssumeRoleWithWebIdentity. Requires `boto3`. obstore's own
`AmazonS3Builder::from_env()` only reads static keys, so without this the
builder falls back to IMDS and picks up the node instance role instead of
the pod's service-account role.

4. If none of the above applies, obstore uses the default AWS credential chain
(for remote runs this commonly resolves via IAM attached to the EC2 instance
via IMDS).
"""

endpoint: typing.Optional[str] = None
Expand Down Expand Up @@ -166,6 +173,12 @@ def _build_s3_credential_provider_from_config_file(
)
return Boto3CredentialProvider(session=boto3_session)

def _build_s3_credential_provider_from_web_identity(self, region: str | None) -> typing.Any:
import boto3.session
from obstore.auth.boto3 import Boto3CredentialProvider

return Boto3CredentialProvider(session=boto3.session.Session(region_name=region))

def get_fsspec_kwargs(self, anonymous: bool = False, **kwargs) -> typing.Dict[str, typing.Any]:
kwargs = super().get_fsspec_kwargs(anonymous=anonymous, **kwargs)

Expand All @@ -191,12 +204,14 @@ def get_fsspec_kwargs(self, anonymous: bool = False, **kwargs) -> typing.Dict[st
if not anonymous and not has_static_credentials:
aws_profile = os.getenv("AWS_PROFILE", None)
aws_config_file = os.getenv("AWS_CONFIG_FILE", None)
aws_web_identity_token_file = os.getenv("AWS_WEB_IDENTITY_TOKEN_FILE", None)
region = self.region or os.getenv("AWS_REGION", None)
if aws_profile is not None and aws_config_file is not None:
try:
kwargs["credential_provider"] = self._build_s3_credential_provider_from_config_file(
aws_profile=aws_profile,
aws_config_file=aws_config_file,
region=self.region or os.getenv("AWS_REGION", None),
region=region,
)
logger.debug(
"Using S3 credentials from AWS config file with profile %s at %s",
Expand All @@ -209,6 +224,23 @@ def get_fsspec_kwargs(self, anonymous: bool = False, **kwargs) -> typing.Dict[st
"Falling back to default AWS credential resolution.",
e,
)
elif aws_web_identity_token_file is not None:
# obstore's AmazonS3Builder::from_env() only reads static keys and ignores
# AWS_WEB_IDENTITY_TOKEN_FILE / AWS_ROLE_ARN, so without an explicit
# credential_provider the builder falls back to IMDS and picks up the EC2
# node role instead of the pod's IRSA role. Route through boto3 so
# botocore's standard chain runs AssumeRoleWithWebIdentity.
try:
kwargs["credential_provider"] = self._build_s3_credential_provider_from_web_identity(
region=region,
)
logger.debug("Using S3 credentials from IRSA web identity token via boto3 session")
except Exception as e:
logger.warning(
"Unable to initialize S3 web-identity credential provider (%s). "
"Falling back to default AWS credential resolution.",
e,
)

if config:
kwargs["config"] = config
Expand Down
78 changes: 78 additions & 0 deletions tests/internal/storage/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,84 @@ def test_get_fsspec_kwargs_profile_provider_failure_falls_back(self, monkeypatch

assert "credential_provider" not in result

def test_get_fsspec_kwargs_with_web_identity_uses_credential_provider(self, monkeypatch):
# EKS / IRSA case: AWS_WEB_IDENTITY_TOKEN_FILE is set, no profile/config file,
# no static creds. Must route through boto3 so AssumeRoleWithWebIdentity runs;
# otherwise obstore falls back to IMDS (node role) and returns 403.
s3 = S3(region="us-east-2")
monkeypatch.delenv("AWS_PROFILE", raising=False)
monkeypatch.delenv("AWS_CONFIG_FILE", raising=False)
monkeypatch.setenv("AWS_WEB_IDENTITY_TOKEN_FILE", "/var/run/secrets/eks.amazonaws.com/serviceaccount/token")
monkeypatch.setenv("AWS_ROLE_ARN", "arn:aws:iam::123:role/sa-role")

def _fake_provider(self, region):
assert region == "us-east-2"
return "provider"

monkeypatch.setattr(S3, "_build_s3_credential_provider_from_web_identity", _fake_provider)
result = s3.get_fsspec_kwargs()

assert result["credential_provider"] == "provider"

def test_get_fsspec_kwargs_web_identity_not_used_when_profile_present(self, monkeypatch):
# Profile path wins when both AWS_PROFILE+AWS_CONFIG_FILE and the web identity
# env vars are set. Mirrors existing precedence; no regression for users who
# explicitly opted into profile-based auth.
s3 = S3()
monkeypatch.setenv("AWS_PROFILE", "dev-profile")
monkeypatch.setenv("AWS_CONFIG_FILE", "/tmp/config")
monkeypatch.setenv("AWS_WEB_IDENTITY_TOKEN_FILE", "/var/run/secrets/eks.amazonaws.com/serviceaccount/token")
monkeypatch.setattr(
S3,
"_build_s3_credential_provider_from_config_file",
lambda self, aws_profile, aws_config_file, region: "profile-provider",
)
monkeypatch.setattr(
S3,
"_build_s3_credential_provider_from_web_identity",
lambda self, region: (_ for _ in ()).throw(AssertionError("web identity should not be called")),
)
result = s3.get_fsspec_kwargs()

assert result["credential_provider"] == "profile-provider"

def test_get_fsspec_kwargs_web_identity_not_used_when_static_credentials_present(self, monkeypatch):
s3 = S3(access_key_id="test-key", secret_access_key="test-secret")
monkeypatch.setenv("AWS_WEB_IDENTITY_TOKEN_FILE", "/var/run/secrets/eks.amazonaws.com/serviceaccount/token")
monkeypatch.setattr(
S3,
"_build_s3_credential_provider_from_web_identity",
lambda self, region: (_ for _ in ()).throw(AssertionError("provider should not be called")),
)
result = s3.get_fsspec_kwargs()

assert "credential_provider" not in result

def test_get_fsspec_kwargs_anonymous_does_not_use_web_identity_provider(self, monkeypatch):
s3 = S3()
monkeypatch.setenv("AWS_WEB_IDENTITY_TOKEN_FILE", "/var/run/secrets/eks.amazonaws.com/serviceaccount/token")
monkeypatch.setattr(
S3,
"_build_s3_credential_provider_from_web_identity",
lambda self, region: (_ for _ in ()).throw(AssertionError("provider should not be called for anonymous")),
)
result = s3.get_fsspec_kwargs(anonymous=True)

assert result["config"]["skip_signature"] is True
assert "credential_provider" not in result

def test_get_fsspec_kwargs_web_identity_provider_failure_falls_back(self, monkeypatch):
s3 = S3()
monkeypatch.setenv("AWS_WEB_IDENTITY_TOKEN_FILE", "/var/run/secrets/eks.amazonaws.com/serviceaccount/token")
monkeypatch.setattr(
S3,
"_build_s3_credential_provider_from_web_identity",
lambda self, region: (_ for _ in ()).throw(ModuleNotFoundError("boto3 missing")),
)
result = s3.get_fsspec_kwargs()

assert "credential_provider" not in result


class TestGCSConfig:
def test_get_fsspec_kwargs_default(self):
Expand Down
Loading