Skip to content

Commit

Permalink
feat: make sign and idtoken endpooints universe aware
Browse files Browse the repository at this point in the history
  • Loading branch information
TimurSadykov committed Oct 11, 2024
1 parent 116903a commit ca31a66
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
10 changes: 3 additions & 7 deletions google/auth/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
http_client.GATEWAY_TIMEOUT,
}

_DEFAULT_UNIVERSE_DOMAIN = ["googleapis.com"]

_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]

_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]

_IAM_ENDPOINT = (
Expand All @@ -47,12 +43,12 @@
)

_IAM_SIGN_ENDPOINT = (
"https://iamcredentials.googleapis.com/v1/projects/-"
"https://iamcredentials.{}/v1/projects/-"
+ "/serviceAccounts/{}:signBlob"
)

_IAM_IDTOKEN_ENDPOINT = (
"https://iamcredentials.googleapis.com/v1/"
"https://iamcredentials.{}/v1/"
+ "projects/-/serviceAccounts/{}:generateIdToken"
)

Expand Down Expand Up @@ -93,7 +89,7 @@ def _make_signing_request(self, message):
message = _helpers.to_bytes(message)

method = "POST"
url = _IAM_SIGN_ENDPOINT.format(self._service_account_email)
url = _IAM_SIGN_ENDPOINT.format(self._credentials.universe_domain, self._service_account_email)
headers = {"Content-Type": "application/json"}
body = json.dumps(
{"payload": base64.b64encode(message).decode("utf-8")}
Expand Down
6 changes: 5 additions & 1 deletion google/auth/impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,14 @@ def _update_token(self, request):
body=body,
iam_endpoint_override=self._iam_endpoint_override,
)

def get_iam_sign_endpoint(self):
return iam._IAM_SIGN_ENDPOINT.format(self.universe_domain, self._target_principal)

def sign_bytes(self, message):
from google.auth.transport.requests import AuthorizedSession

iam_sign_endpoint = iam._IAM_SIGN_ENDPOINT.format(self._target_principal)
iam_sign_endpoint = get_iam_sign_endpoint(self)

body = {
"payload": base64.b64encode(message).decode("utf-8"),
Expand Down Expand Up @@ -433,6 +436,7 @@ def refresh(self, request):
from google.auth.transport.requests import AuthorizedSession

iam_sign_endpoint = iam._IAM_IDTOKEN_ENDPOINT.format(
self._target_credentials.universe_domain,
self._target_credentials.signer_email
)

Expand Down
22 changes: 22 additions & 0 deletions tests/test_impersonated_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def make_credentials(
lifetime=LIFETIME,
target_principal=TARGET_PRINCIPAL,
iam_endpoint_override=None,
universe_domain=credentials.DEFAULT_UNIVERSE_DOMAIN
):

return Credentials(
Expand All @@ -133,6 +134,7 @@ def make_credentials(
delegates=self.DELEGATES,
lifetime=lifetime,
iam_endpoint_override=iam_endpoint_override,
universe_domain=universe_domain,
)

def test_get_cred_info(self):
Expand All @@ -144,6 +146,21 @@ def test_get_cred_info(self):
"credential_source": "/path/to/file",
"credential_type": "impersonated credentials",
"principal": "impersonated@project.iam.gserviceaccount.com",
"iam_endpoint_override": None,
}

def test_get_cred_info_universe_domain(self):
credentials = self.make_credentials(universe_domain="foo.bar")
assert not credentials.get_cred_info()

credentials._cred_file_path = "/path/to/file"
assert credentials.get_cred_info() == {
"credential_source": "/path/to/file",
"credential_type": "impersonated credentials",
"principal": "impersonated@project.iam.gserviceaccount.com",
"universe_domain": "foo.bar",
"iam_endpoint_override": "https://iamcredentials.foo.bar/v1/projects/-"
+ "/serviceAccounts/impersonated@project.iam.gserviceaccount.com:generateAccessToken"
}

def test__make_copy_get_cred_info(self):
Expand Down Expand Up @@ -390,6 +407,11 @@ def test_signer(self):
def test_signer_email(self):
credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
assert credentials.signer_email == self.TARGET_PRINCIPAL

def test_sign_endpoint(self):
credentials = self.make_credentials(universe_domain="foo.bar")
assert credentials.get_iam_sign_endpoint == "https://iamcredentials.foo.bar/v1/projects/-"
+ "/serviceAccounts/impersonated@project.iam.gserviceaccount.com:signBlob"

def test_service_account_email(self):
credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
Expand Down

0 comments on commit ca31a66

Please sign in to comment.