Skip to content

Commit

Permalink
Refactor to reuse CIAM test cases for CIAM CUD
Browse files Browse the repository at this point in the history
  • Loading branch information
rayluo committed Aug 28, 2024
1 parent c6595d3 commit 094ce75
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def _build_app(cls,
client_id,
client_credential=None,
authority="https://login.microsoftonline.com/common",
oidc_authority=None,
scopes=["https://graph.microsoft.com/.default"], # Microsoft Graph
http_client=None,
azure_region=None,
Expand All @@ -181,6 +182,7 @@ def _build_app(cls,
client_id,
client_credential=client_credential,
authority=authority,
oidc_authority=oidc_authority,
azure_region=azure_region,
http_client=http_client or MinimalHttpClient(),
)
Expand All @@ -194,21 +196,24 @@ def _build_app(cls,
return msal.PublicClientApplication(
client_id,
authority=authority,
oidc_authority=oidc_authority,
http_client=http_client or MinimalHttpClient(),
enable_broker_on_windows=broker_available,
enable_broker_on_mac=broker_available,
)

def _test_username_password(self,
authority=None, client_id=None, username=None, password=None, scope=None,
oidc_authority=None,
client_secret=None, # Since MSAL 1.11, confidential client has ROPC too
azure_region=None,
http_client=None,
auth_scheme=None,
**ignored):
assert authority and client_id and username and password and scope
assert client_id and username and password and scope and (
authority or oidc_authority)
self.app = self._build_app(
client_id, authority=authority,
client_id, authority=authority, oidc_authority=oidc_authority,
http_client=http_client,
azure_region=azure_region, # Regional endpoint does not support ROPC.
# Here we just use it to test a regional app won't break ROPC.
Expand All @@ -229,9 +234,14 @@ def _test_username_password(self,
os.getenv("TRAVIS"), # It is set when running on TravisCI or Github Actions
"Although it is doable, we still choose to skip device flow to save time")
def _test_device_flow(
self, client_id=None, authority=None, scope=None, **ignored):
assert client_id and authority and scope
self.app = self._build_app(client_id, authority=authority)
self,
*,
client_id=None, authority=None, oidc_authority=None, scope=None,
**ignored
):
assert client_id and scope and (authority or oidc_authority)
self.app = self._build_app(
client_id, authority=authority, oidc_authority=oidc_authority)
flow = self.app.initiate_device_flow(scopes=scope)
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
json.dumps(flow, indent=4))
Expand All @@ -255,16 +265,18 @@ def _test_device_flow(

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def _test_acquire_token_interactive(
self, client_id=None, authority=None, scope=None, port=None,
self, *, client_id=None, authority=None, scope=None, port=None,
oidc_authority=None,
username=None, lab_name=None,
username_uri="", # Unnecessary if you provided username and lab_name
data=None, # Needed by ssh-cert feature
prompt=None,
enable_msa_passthrough=None,
auth_scheme=None,
**ignored):
assert client_id and authority and scope
self.app = self._build_app(client_id, authority=authority)
assert client_id and scope and (authority or oidc_authority)
self.app = self._build_app(
client_id, authority=authority, oidc_authority=oidc_authority)
logger.info(_get_hint( # Useful when testing broker which shows no welcome_template
username=username, lab_name=lab_name, username_uri=username_uri))
result = self.app.acquire_token_interactive(
Expand Down Expand Up @@ -682,10 +694,13 @@ def _test_acquire_token_obo(self, config_pca, config_cca,

def _test_acquire_token_by_client_secret(
self, client_id=None, client_secret=None, authority=None, scope=None,
oidc_authority=None,
**ignored):
assert client_id and client_secret and authority and scope
assert client_id and client_secret and scope and (
authority or oidc_authority)
self.app = msal.ConfidentialClientApplication(
client_id, client_credential=client_secret, authority=authority,
oidc_authority=oidc_authority,
http_client=MinimalHttpClient())
result = self.app.acquire_token_for_client(scope)
self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result)
Expand Down Expand Up @@ -1016,7 +1031,8 @@ def setUpClass(cls):

def test_ciam_acquire_token_interactive(self):
self._test_acquire_token_interactive(
authority=self.app_config["authority"],
authority=self.app_config.get("authority"),
oidc_authority=self.app_config.get("oidc_authority"),
client_id=self.app_config["appId"],
scope=self.app_config["scopes"],
username=self.user["username"],
Expand All @@ -1034,7 +1050,8 @@ def test_ciam_acquire_token_for_client(self):
self._test_acquire_token_by_client_secret(
client_id=self.app_config["appId"],
client_secret=self.get_lab_user_secret(secret_name),
authority=self.app_config["authority"],
authority=self.app_config.get("authority"),
oidc_authority=self.app_config.get("oidc_authority"),
scope=self.app_config["scopes"], # It shall ends with "/.default"
)

Expand All @@ -1047,7 +1064,8 @@ def test_ciam_acquire_token_by_ropc(self):
# and enabling "Allow public client flows".
# Otherwise it would hit AADSTS7000218.
self._test_username_password(
authority=self.app_config["authority"],
authority=self.app_config.get("authority"),
oidc_authority=self.app_config.get("oidc_authority"),
client_id=self.app_config["appId"],
username=self.user["username"],
password=self.get_lab_user_secret(self.user["lab_name"]),
Expand All @@ -1058,12 +1076,23 @@ def test_ciam_acquire_token_by_ropc(self):
AADSTS500208: The domain is not a valid login domain for the account type.""")
def test_ciam_device_flow(self):
self._test_device_flow(
authority=self.app_config["authority"],
authority=self.app_config.get("authority"),
oidc_authority=self.app_config.get("oidc_authority"),
client_id=self.app_config["appId"],
scope=self.app_config["scopes"],
)


class CiamCudTestCase(CiamTestCase):
@classmethod
def setUpClass(cls):
super(CiamCudTestCase, cls).setUpClass()
cls.app_config["authority"] = None
cls.app_config["oidc_authority"] = (
# Derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.63.0/tests/Microsoft.Identity.Test.Integration.netcore/HeadlessTests/CiamIntegrationTests.cs#L156
"https://login.msidlabsciam.com/fe362aec-5d43-45d1-b730-9755e60dc3b9/v2.0")


class WorldWideRegionalEndpointTestCase(LabBasedTestCase):
region = "westus"
timeout = 2 # Short timeout makes this test case responsive on non-VM
Expand Down

0 comments on commit 094ce75

Please sign in to comment.