@@ -94,19 +94,23 @@ def test_username_password(self):
94
94
self .skipUnlessWithConfig (["client_id" , "username" , "password" , "scope" ])
95
95
self ._test_username_password (** self .config )
96
96
97
- def test_auth_code (self ):
98
- self .skipUnlessWithConfig (["client_id" , "scope" ])
97
+ def _get_app_and_auth_code (self ):
99
98
from msal .oauth2cli .authcode import obtain_auth_code
100
- self . app = msal .ClientApplication (
99
+ app = msal .ClientApplication (
101
100
self .config ["client_id" ],
102
101
client_credential = self .config .get ("client_secret" ),
103
102
authority = self .config .get ("authority" ))
104
103
port = self .config .get ("listen_port" , 44331 )
105
104
redirect_uri = "http://localhost:%s" % port
106
- auth_request_uri = self . app .get_authorization_request_url (
105
+ auth_request_uri = app .get_authorization_request_url (
107
106
self .config ["scope" ], redirect_uri = redirect_uri )
108
107
ac = obtain_auth_code (port , auth_uri = auth_request_uri )
109
108
self .assertNotEqual (ac , None )
109
+ return (app , ac , redirect_uri )
110
+
111
+ def test_auth_code (self ):
112
+ self .skipUnlessWithConfig (["client_id" , "scope" ])
113
+ (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code ()
110
114
111
115
result = self .app .acquire_token_by_authorization_code (
112
116
ac , self .config ["scope" ], redirect_uri = redirect_uri )
@@ -120,6 +124,46 @@ def test_auth_code(self):
120
124
error_description = result .get ("error_description" )))
121
125
self .assertCacheWorksForUser (result , self .config ["scope" ], username = None )
122
126
127
+
128
+ def test_ssh_cert (self ):
129
+ self .skipUnlessWithConfig (["client_id" , "scope" ])
130
+
131
+ JWK1 = """{"kty":"RSA", "n":"2tNr73xwcj6lH7bqRZrFzgSLj7OeLfbn8216uOMDHuaZ6TEUBDN8Uz0ve8jAlKsP9CQFCSVoSNovdE-fs7c15MxEGHjDcNKLWonznximj8pDGZQjVdfK-7mG6P6z-lgVcLuYu5JcWU_PeEqIKg5llOaz-qeQ4LEDS4T1D2qWRGpAra4rJX1-kmrWmX_XIamq30C9EIO0gGuT4rc2hJBWQ-4-FnE1NXmy125wfT3NdotAJGq5lMIfhjfglDbJCwhc8Oe17ORjO3FsB5CLuBRpYmP7Nzn66lRY3Fe11Xz8AEBl3anKFSJcTvlMnFtu3EpD-eiaHfTgRBU7CztGQqVbiQ", "e":"AQAB"}"""
132
+ JWK2 = """{"kty":"RSA", "n":"72u07mew8rw-ssw3tUs9clKstGO2lvD7ZNxJU7OPNKz5PGYx3gjkhUmtNah4I4FP0DuF1ogb_qSS5eD86w10Wb1ftjWcoY8zjNO9V3ph-Q2tMQWdDW5kLdeU3-EDzc0HQeou9E0udqmfQoPbuXFQcOkdcbh3eeYejs8sWn3TQprXRwGh_TRYi-CAurXXLxQ8rp-pltUVRIr1B63fXmXhMeCAGwCPEFX9FRRs-YHUszUJl9F9-E0nmdOitiAkKfCC9LhwB9_xKtjmHUM9VaEC9jWOcdvXZutwEoW2XPMOg0Ky-s197F9rfpgHle2gBrXsbvVMvS0D-wXg6vsq6BAHzQ", "e":"AQAB"}"""
133
+ data1 = {"token_type" : "ssh-cert" , "key_id" : "key1" , "req_cnf" : JWK1 }
134
+ ssh_test_slice = {
135
+ "dc" : "prod-wst-test1" ,
136
+ "slice" : "test" ,
137
+ "sshcrt" : "true" ,
138
+ }
139
+
140
+ (self .app , ac , redirect_uri ) = self ._get_app_and_auth_code ()
141
+
142
+ result = self .app .acquire_token_by_authorization_code (
143
+ ac , self .config ["scope" ], redirect_uri = redirect_uri , data = data1 ,
144
+ params = ssh_test_slice )
145
+ self .assertEqual ("ssh-cert" , result ["token_type" ])
146
+ logger .debug ("%s.cache = %s" ,
147
+ self .id (), json .dumps (self .app .token_cache ._cache , indent = 4 ))
148
+
149
+ # acquire_token_silent() needs to be passed the same key to work
150
+ account = self .app .get_accounts ()[0 ]
151
+ result_from_cache = self .app .acquire_token_silent (
152
+ self .config ["scope" ], account = account , data = data1 )
153
+ self .assertIsNotNone (result_from_cache )
154
+ self .assertEqual (
155
+ result ['access_token' ], result_from_cache ['access_token' ],
156
+ "We should get the cached SSH-cert" )
157
+
158
+ # refresh_token grant can fetch an ssh-cert bound to a different key
159
+ refreshed_ssh_cert = self .app .acquire_token_silent (
160
+ self .config ["scope" ], account = account , params = ssh_test_slice ,
161
+ data = {"token_type" : "ssh-cert" , "key_id" : "key2" , "req_cnf" : JWK2 })
162
+ self .assertIsNotNone (refreshed_ssh_cert )
163
+ self .assertEqual (refreshed_ssh_cert ["token_type" ], "ssh-cert" )
164
+ self .assertNotEqual (result ["access_token" ], refreshed_ssh_cert ['access_token' ])
165
+
166
+
123
167
def test_client_secret (self ):
124
168
self .skipUnlessWithConfig (["client_id" , "client_secret" ])
125
169
self .app = msal .ConfidentialClientApplication (
0 commit comments