Skip to content

feat: Add trust boundary support for service accounts and impersonation. #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
40 changes: 40 additions & 0 deletions google/auth/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import datetime
from email.message import Message
import hashlib
import os
import json
import logging
import sys
Expand Down Expand Up @@ -287,6 +288,45 @@ def unpadded_urlsafe_b64encode(value):
return base64.urlsafe_b64encode(value).rstrip(b"=")


def get_bool_from_env(variable_name, default=False):
"""Gets a boolean value from an environment variable.

The environment variable is interpreted as a boolean with the following
(case-insensitive) rules:
- "true", "1" are considered true.
- "false", "0" are considered false.

Args:
variable_name (str): The name of the environment variable.
default (bool): The default value if the environment variable is not
set.

Returns:
bool: The boolean value of the environment variable.

Raises:
google.auth.exceptions.InvalidValue: If the environment variable is
set to a value that can not be interpreted as a boolean.
"""
value = os.environ.get(variable_name)

if value is None:
return default

value = value.lower()

if value in ("true", "1"):
return True
elif value in ("false", "0"):
return False
else:
raise exceptions.InvalidValue(
'Environment variable "{}" must be one of "true", "false", "1", or "0".'.format(
variable_name
)
)


def is_python_3():
"""Check if the Python interpreter is Python 2 or 3.

Expand Down
55 changes: 53 additions & 2 deletions google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@
from google.auth.compute_engine import _metadata
from google.oauth2 import _client

_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)


class Credentials(
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.CredentialsWithUniverseDomain,
credentials.CredentialsWithTrustBoundary,
):
"""Compute Engine Credentials.

Expand All @@ -61,6 +66,7 @@ def __init__(
scopes=None,
default_scopes=None,
universe_domain=None,
trust_boundary=None,
):
"""
Args:
Expand All @@ -76,6 +82,7 @@ def __init__(
provided or None, credential will attempt to fetch the value
from metadata server. If metadata server doesn't have universe
domain endpoint, then the default googleapis.com will be used.
trust_boundary (Mapping[str,str]): A credential trust boundary.
"""
super(Credentials, self).__init__()
self._service_account_email = service_account_email
Expand All @@ -86,6 +93,7 @@ def __init__(
if universe_domain:
self._universe_domain = universe_domain
self._universe_domain_cached = True
self._trust_boundary = trust_boundary

def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_MDS
Expand All @@ -111,6 +119,33 @@ def refresh(self, request):
new_exc = exceptions.RefreshError(caught_exc)
raise new_exc from caught_exc

self._refresh_trust_boundary(request)

def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API for GCE."""
# If the service account email is 'default', we need to get the
# actual email address from the metadata server.
if self._service_account_email == "default":
from google.auth.transport import requests as google_auth_requests

request = google_auth_requests.Request()
try:
info = _metadata.get_service_account_info(request, "default")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harkamaljot you had worked on optimizing this call somewhere else in the code recently. Can you PTAL at this method and see if there are any issues in doing this call or something can be optimized.

# Cache the fetched email so we don't have to do this again.
self._service_account_email = info["email"]

except exceptions.TransportError as e:
# If fetching the service account email fails due to a transport error,
# it means we cannot build the trust boundary lookup URL.
# Wrap this in a RefreshError so it's caught by _refresh_trust_boundary.
raise exceptions.RefreshError(
f"Failed to get service account email for trust boundary lookup: {e}"
) from e

return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self.universe_domain, self.service_account_email
)

@property
def service_account_email(self):
"""The service account email.
Expand Down Expand Up @@ -152,8 +187,9 @@ def with_quota_project(self, quota_project_id):
quota_project_id=quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain = self._universe_domain
creds._universe_domain_cached = self._universe_domain_cached
return creds

Expand All @@ -167,8 +203,9 @@ def with_scopes(self, scopes, default_scopes=None):
default_scopes=default_scopes,
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain = self._universe_domain
creds._universe_domain_cached = self._universe_domain_cached
return creds

Expand All @@ -179,9 +216,23 @@ def with_universe_domain(self, universe_domain):
default_scopes=self._default_scopes,
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
trust_boundary=self._trust_boundary,
universe_domain=universe_domain,
)

@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
creds = self.__class__(
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=trust_boundary,
)
creds._universe_domain_cached = self._universe_domain_cached
return creds


_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
Expand Down
144 changes: 126 additions & 18 deletions google/auth/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

import abc
from enum import Enum
import json
import os
import typing

from google.auth import _helpers, environment_vars
from google.auth import exceptions
from google.auth import metrics
from google.auth._credentials_base import _BaseCredentials
from google.auth._default import _LOGGER
from google.auth._refresh_worker import RefreshThreadManager

DEFAULT_UNIVERSE_DOMAIN = "googleapis.com"
NO_OP_TRUST_BOUNDARY_LOCATIONS: "typing.Tuple[str]" = ()
NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS = "0x0"


class Credentials(_BaseCredentials):
Expand Down Expand Up @@ -178,22 +183,7 @@ def apply(self, headers, token=None):
token (Optional[str]): If specified, overrides the current access
token.
"""
self._apply(headers, token=token)
"""Trust boundary value will be a cached value from global lookup.

The response of trust boundary will be a list of regions and a hex
encoded representation.

An example of global lookup response:
{
"locations": [
"us-central1", "us-east1", "europe-west1", "asia-east1"
]
"encoded_locations": "0xA30"
}
"""
if self._trust_boundary is not None:
headers["x-allowed-locations"] = self._trust_boundary["encoded_locations"]
self._apply(headers, token)
if self.quota_project_id:
headers["x-goog-user-project"] = self.quota_project_id

Expand Down Expand Up @@ -299,6 +289,125 @@ def with_universe_domain(self, universe_domain):
)


class CredentialsWithTrustBoundary(Credentials):
"""Abstract base for credentials supporting ``with_trust_boundary`` factory"""

def with_trust_boundary(self, trust_boundary):
"""Returns a copy of these credentials with a modified trust boundary.

Args:
trust_boundary Mapping[str, str]: The trust boundary to use for the
credential. This should be a map with a "locations" key that maps to
a list of GCP regions, and a "encodedLocations" key that maps to a
hex string.

Returns:
google.auth.credentials.Credentials: A new credentials instance.
"""
raise NotImplementedError("This credential does not support trust boundaries.")

def apply(self, headers, token=None):
"""Apply the token to the authentication header."""
super().apply(headers, token)
if self._trust_boundary is not None:
if self._has_no_op_trust_boundary:
# STS expects an empty string if the trust boundary value is no-op.
headers["x-allowed-locations"] = ""
else:
headers["x-allowed-locations"] = self._trust_boundary[
"encodedLocations"
]

def _refresh_trust_boundary(self, request):
"""Triggers a refresh of the trust boundary and updates the cache if necessary.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Raises:
google.auth.exceptions.RefreshError: If the trust boundary could
not be refreshed and no cached value is available.
"""
# Do not trigger refresh if credential has a cached no-op trust boundary.
if self._has_no_op_trust_boundary():
return
new_trust_boundary = {}
try:
new_trust_boundary = self._lookup_trust_boundary(request)
except exceptions.RefreshError as error:
# If the call to the lookup API failed, check if there is a trust boundary
# already cached. If there is, do nothing. If not, then throw the error.
if self._trust_boundary is None:
raise error
if _helpers.is_logging_enabled(_LOGGER):
_LOGGER.debug(
"Using cached trust boundary due to refresh error: %s", error
)
return
else:
self._trust_boundary = new_trust_boundary

def _lookup_trust_boundary(self, request):
"""Calls the trust boundary lookup API to refresh the trust boundary cache.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.

Returns:
trust_boundary (dict): The trust boundary object returned by the lookup API.

Raises:
google.auth.exceptions.RefreshError: If the trust boundary could not be
retrieved.
"""
from google.oauth2 import _client

# Verify the trust boundary feature flag is enabled.
if not _helpers.get_bool_from_env(
environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED, default=False
):
# Skip the lookup and return early if it's not explicitly enabled.
return None

# Skip trust boundary flow for non-gdu universe domain.
if self.universe_domain != DEFAULT_UNIVERSE_DOMAIN:
return None

url = self._build_trust_boundary_lookup_url()
if not url:
raise exceptions.InvalidValue("Failed to build trust boundary lookup URL.")
return _client.lookup_trust_boundary(request, url, self.token)

@abc.abstractmethod
def _build_trust_boundary_lookup_url(self):
"""
Builds and returns the URL for the trust boundary lookup API.

This method should be implemented by subclasses to provide the
specific URL based on the credential type and its properties.

Returns:
str: The URL for the trust boundary lookup endpoint, or None
if lookup should be skipped (e.g., for non-applicable universe domains).
"""
raise NotImplementedError(
"_build_trust_boundary_lookup_url must be implemented"
)

def _has_no_op_trust_boundary(self):
# A no-op trust boundary is indicated by encodedLocations being "0x0".
# The "locations" list may or may not be present as an empty list.
if (
self._trust_boundary is not None
and self._trust_boundary["encodedLocations"]
== NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS
):
return True
return False


class AnonymousCredentials(Credentials):
"""Credentials that do not provide any authentication information.

Expand Down Expand Up @@ -382,8 +491,7 @@ def default_scopes(self):

@abc.abstractproperty
def requires_scopes(self):
"""True if these credentials require scopes to obtain an access token.
"""
"""True if these credentials require scopes to obtain an access token."""
return False

def has_scopes(self, scopes):
Expand Down
4 changes: 4 additions & 0 deletions google/auth/environment_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,7 @@
AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"
AWS_REGION = "AWS_REGION"
AWS_DEFAULT_REGION = "AWS_DEFAULT_REGION"

GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED"
"""Environment variable controlling whether to enable trust boundary feature.
The default value is false. Users have to explicitly set this value to true."""
Loading