|
14 | 14 | from rest_framework import exceptions |
15 | 15 | from rest_framework_simplejwt.tokens import UntypedToken |
16 | 16 |
|
17 | | -from src.shipchain_common.authentication import EngineRequest, passive_credentials_auth, PermissionedTokenUser,\ |
18 | | - TransmissionRequest |
| 17 | +from src.shipchain_common.authentication import EngineRequest, passive_credentials_auth, PermissionedTokenUser, \ |
| 18 | + TransmissionRequest, LambdaRequest |
19 | 19 | from src.shipchain_common.test_utils import get_jwt |
20 | 20 | from src.shipchain_common.utils import random_id |
21 | 21 |
|
@@ -43,6 +43,11 @@ def transmission_request(): |
43 | 43 | return TransmissionRequest() |
44 | 44 |
|
45 | 45 |
|
| 46 | +@pytest.fixture() |
| 47 | +def lambda_request(): |
| 48 | + return LambdaRequest() |
| 49 | + |
| 50 | + |
46 | 51 | def test_passive_jwt_auth(username): |
47 | 52 | with pytest.raises(exceptions.AuthenticationFailed): |
48 | 53 | passive_credentials_auth('') |
@@ -118,6 +123,35 @@ def test_transmission_auth_requires_header(transmission_request): |
118 | 123 | assert transmission_request.has_permission(request, {}) |
119 | 124 |
|
120 | 125 |
|
| 126 | +def test_lambda_auth_requires_header(lambda_request): |
| 127 | + request = HttpRequest() |
| 128 | + |
| 129 | + assert not lambda_request.has_permission(request, {}) |
| 130 | + |
| 131 | + request.META['X_NGINX_SOURCE'] = 'alb' |
| 132 | + assert not lambda_request.has_permission(request, {}) |
| 133 | + |
| 134 | + request.META['X_NGINX_SOURCE'] = 'internal' |
| 135 | + with pytest.raises(KeyError): |
| 136 | + lambda_request.has_permission(request, {}) |
| 137 | + |
| 138 | + request.META['X_SSL_CLIENT_VERIFY'] = 'NONE' |
| 139 | + assert not lambda_request.has_permission(request, {}) |
| 140 | + |
| 141 | + request.META['X_SSL_CLIENT_VERIFY'] = 'SUCCESS' |
| 142 | + with pytest.raises(KeyError): |
| 143 | + lambda_request.has_permission(request, {}) |
| 144 | + |
| 145 | + request.META['X_SSL_CLIENT_DN'] = '/CN=lambda.h4ck3d' |
| 146 | + assert not lambda_request.has_permission(request, {}) |
| 147 | + |
| 148 | + request.META['X_SSL_CLIENT_DN'] = '/CN=profiles.test-internal' |
| 149 | + assert not lambda_request.has_permission(request, {}) |
| 150 | + |
| 151 | + request.META['X_SSL_CLIENT_DN'] = '/CN=lambda.test-internal' |
| 152 | + assert lambda_request.has_permission(request, {}) |
| 153 | + |
| 154 | + |
121 | 155 | def test_token_user_jti_cache_key(): |
122 | 156 | """By default, the jti is included in get_jwt and is used as cache key""" |
123 | 157 | jwt = get_jwt() |
|
0 commit comments