Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/azure-cli-core/azure/cli/core/auth/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def _msal_app_kwargs(self):
return {
"authority": self._msal_authority,
"token_cache": Identity._msal_token_cache,
"http_cache": Identity._msal_http_cache
"http_cache": Identity._msal_http_cache,
# CP1 means we can handle claims challenges (CAE)
"client_capabilities": None if "AZURE_IDENTITY_DISABLE_CP1" in os.environ else ["CP1"]
}

@property
Expand Down
11 changes: 7 additions & 4 deletions src/azure-cli-core/azure/cli/core/auth/msal_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,19 @@ def __init__(self, client_id, username, **kwargs):

self._account = accounts[0]

def get_token(self, *scopes, **kwargs):
def get_token(self, *scopes, claims=None, **kwargs):
# scopes = ['https://pas.windows.net/CheckMyAccess/Linux/.default']
logger.debug("UserCredential.get_token: scopes=%r, kwargs=%r", scopes, kwargs)
logger.debug("UserCredential.get_token: scopes=%r, claims=%r, kwargs=%r", scopes, claims, kwargs)

result = self.acquire_token_silent_with_error(list(scopes), self._account, **kwargs)
if claims:
logger.warning('Acquiring new access token silently for tenant %s with claims challenge: %s',
self.authority.tenant, claims)
result = self.acquire_token_silent_with_error(list(scopes), self._account, claims_challenge=claims, **kwargs)

from azure.cli.core.azclierror import AuthenticationError
try:
# Check if an access token is returned.
check_result(result, scopes=scopes)
check_result(result, scopes=scopes, claims=claims)
except AuthenticationError as ex:
# For VM SSH ('data' is passed), if getting access token fails because
# Conditional Access MFA step-up or compliance check is required, re-launch
Expand Down
9 changes: 7 additions & 2 deletions src/azure-cli-core/azure/cli/core/auth/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,24 @@ def aad_error_handler(error, **kwargs):
# Cloud Shell uses IMDS-like interface for implicit login. If getting token/cert failed,
# we let the user explicitly log in to AAD with MSAL.
"Please explicitly log in with:\n{}" if error.get('error') == 'broker_error'
else "To re-authenticate, please run:\n{}").format(login_command)
else "Interactive authentication is needed. Please run:\n{}").format(login_command)

from azure.cli.core.azclierror import AuthenticationError
raise AuthenticationError(error_description, recommendation=login_message)


def _generate_login_command(scopes=None):
def _generate_login_command(scopes=None, claims=None):
login_command = ['az login']

# Rejected by Conditional Access policy, like MFA
if scopes:
login_command.append('--scope {}'.format(' '.join(scopes)))

# Rejected by CAE
if claims:
# Explicit logout is needed: https://github.com/AzureAD/microsoft-authentication-library-for-python/issues/335
return 'az logout\n' + ' '.join(login_command)

return ' '.join(login_command)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ def _prepare_mgmt_client_kwargs_track2(cli_ctx, cred):
client_kwargs = _prepare_client_kwargs_track2(cli_ctx)

# Enable CAE support in mgmt SDK
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.mgmt.core.policies import ARMChallengeAuthenticationPolicy

# Track 2 SDK maintains `scopes` and passes `scopes` to get_token.
scopes = resource_to_scopes(cli_ctx.cloud.endpoints.active_directory_resource_id)
policy = BearerTokenCredentialPolicy(cred, *scopes)
policy = ARMChallengeAuthenticationPolicy(cred, *scopes)

client_kwargs['credential_scopes'] = scopes
client_kwargs['authentication_policy'] = policy
Expand Down
8 changes: 8 additions & 0 deletions src/azure-cli-core/azure/cli/core/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ def handle_exception(ex): # pylint: disable=too-many-locals, too-many-statement
elif isinstance(ex, ValidationError):
az_error = azclierror.ValidationError(error_msg)

elif isinstance(ex, azclierror.HTTPError):
# For resources that don't support CAE - 401 can't be handled
if ex.response.status_code == 401 and 'WWW-Authenticate' in ex.response.headers:
az_error = azclierror.AuthenticationError(ex)
az_error.set_recommendation("Interactive authentication is needed. Please run:\naz logout\naz login")
else:
az_error = azclierror.UnclassifiedUserFault(ex)

elif isinstance(ex, CLIError):
# TODO: Fine-grained analysis here
az_error = azclierror.UnclassifiedUserFault(error_msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from time import sleep

from azure.cli.core.auth.util import decode_access_token
from azure.cli.core.azclierror import AuthenticationError
from azure.cli.testsdk import LiveScenarioTest
Expand Down Expand Up @@ -80,3 +82,51 @@ def test_conditional_access_mfa(self):

self.cmd('logout')
# endregion


class CAEScenarioTest(LiveScenarioTest):

def setUp(self):
super().setUp()
# Clear MSAL cache to avoid unexpected tokens from cache
self.cmd('az account clear')

def tearDown(self):
self.cmd('az account clear')

def _retry_until_error(self, cmd):
remaining_reties = ARM_MAX_RETRY
while remaining_reties > 0:
remaining_reties -= 1
sleep(ARM_RETRY_INTERVAL)
self.cmd(cmd)
raise AssertionError("Retry chance exhausted.")

def test_client_capabilities(self):
# Verify the access token has CAE enabled
result = self.cmd('account get-access-token').get_output_in_json()
access_token = result['accessToken']
decoded = decode_access_token(access_token)
self.assertEqual(decoded['xms_cc'], ['CP1']) # xms_cc: extension microsoft client capabilities
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems xms_ssm claims is dropped from GA version of CAE, so this test is no longer needed:

self.assertEqual(decoded['xms_ssm'], '1')  # xms_ssm: extension microsoft smart session management


def test_cae_scenario(self):
arm_command = "group list"

self.cmd('login')
self.test_client_capabilities()

# Test access token is working
self.cmd(arm_command)

self._revoke_sign_in_sessions()

# Keep trying until failure
with self.assertRaises(AuthenticationError) as cm:
self._retry_until_error(arm_command)

assert 'AADSTS50173' in cm.exception.error_msg
assert 'az login' in cm.exception.recommendations[0]

def _revoke_sign_in_sessions(self):
# Manually revoke sign-in sessions
self.cmd('rest -m POST -u https://graph.microsoft.com/v1.0/me/revokeSignInSessions')