|
1 | 1 | import json
|
| 2 | +import urllib.parse |
| 3 | + |
| 4 | +from mock import Mock |
2 | 5 |
|
3 | 6 | import synapse.rest.admin
|
4 | 7 | from synapse.rest.client.v1 import login
|
@@ -252,3 +255,85 @@ def _delete_device(self, access_token, user_id, password, device_id):
|
252 | 255 | )
|
253 | 256 | self.render(request)
|
254 | 257 | self.assertEquals(channel.code, 200, channel.result)
|
| 258 | + |
| 259 | + |
| 260 | +class CASRedirectConfirmTestCase(unittest.HomeserverTestCase): |
| 261 | + |
| 262 | + servlets = [ |
| 263 | + login.register_servlets, |
| 264 | + ] |
| 265 | + |
| 266 | + def make_homeserver(self, reactor, clock): |
| 267 | + self.base_url = "https://matrix.goodserver.com/" |
| 268 | + self.redirect_path = "_synapse/client/login/sso/redirect/confirm" |
| 269 | + |
| 270 | + config = self.default_config() |
| 271 | + config["enable_registration"] = True |
| 272 | + config["cas_config"] = { |
| 273 | + "enabled": True, |
| 274 | + "server_url": "https://fake.test", |
| 275 | + "service_url": "https://matrix.goodserver.com:8448", |
| 276 | + } |
| 277 | + config["public_baseurl"] = self.base_url |
| 278 | + |
| 279 | + async def get_raw(uri, args): |
| 280 | + """Return an example response payload from a call to the `/proxyValidate` |
| 281 | + endpoint of a CAS server, copied from |
| 282 | + https://apereo.github.io/cas/5.0.x/protocol/CAS-Protocol-V2-Specification.html#26-proxyvalidate-cas-20 |
| 283 | +
|
| 284 | + This needs to be returned by an async function (as opposed to set as the |
| 285 | + mock's return value) because the corresponding Synapse code awaits on it. |
| 286 | + """ |
| 287 | + return """ |
| 288 | + <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'> |
| 289 | + <cas:authenticationSuccess> |
| 290 | + <cas:user>username</cas:user> |
| 291 | + <cas:proxyGrantingTicket>PGTIOU-84678-8a9d...</cas:proxyGrantingTicket> |
| 292 | + <cas:proxies> |
| 293 | + <cas:proxy>https://proxy2/pgtUrl</cas:proxy> |
| 294 | + <cas:proxy>https://proxy1/pgtUrl</cas:proxy> |
| 295 | + </cas:proxies> |
| 296 | + </cas:authenticationSuccess> |
| 297 | + </cas:serviceResponse> |
| 298 | + """ |
| 299 | + |
| 300 | + mocked_http_client = Mock(spec=["get_raw"]) |
| 301 | + mocked_http_client.get_raw.side_effect = get_raw |
| 302 | + |
| 303 | + self.hs = self.setup_test_homeserver( |
| 304 | + config=config, proxied_http_client=mocked_http_client, |
| 305 | + ) |
| 306 | + |
| 307 | + return self.hs |
| 308 | + |
| 309 | + def test_cas_redirect_confirm(self): |
| 310 | + """Tests that the SSO login flow serves a confirmation page before redirecting a |
| 311 | + user to the redirect URL. |
| 312 | + """ |
| 313 | + base_url = "/login/cas/ticket?redirectUrl" |
| 314 | + redirect_url = "https://dodgy-site.com/" |
| 315 | + |
| 316 | + url_parts = list(urllib.parse.urlparse(base_url)) |
| 317 | + query = dict(urllib.parse.parse_qsl(url_parts[4])) |
| 318 | + query.update({"redirectUrl": redirect_url}) |
| 319 | + query.update({"ticket": "ticket"}) |
| 320 | + url_parts[4] = urllib.parse.urlencode(query) |
| 321 | + cas_ticket_url = urllib.parse.urlunparse(url_parts) |
| 322 | + |
| 323 | + # Get Synapse to call the fake CAS and serve the template. |
| 324 | + request, channel = self.make_request("GET", cas_ticket_url) |
| 325 | + self.render(request) |
| 326 | + |
| 327 | + # Test that the response is HTML. |
| 328 | + content_type_header_value = "" |
| 329 | + for header in channel.result.get("headers", []): |
| 330 | + if header[0] == b"Content-Type": |
| 331 | + content_type_header_value = header[1].decode("utf8") |
| 332 | + |
| 333 | + self.assertTrue(content_type_header_value.startswith("text/html")) |
| 334 | + |
| 335 | + # Test that the body isn't empty. |
| 336 | + self.assertTrue(len(channel.result["body"]) > 0) |
| 337 | + |
| 338 | + # And that it contains our redirect link |
| 339 | + self.assertIn(redirect_url, channel.result["body"].decode("UTF-8")) |
0 commit comments