|  | 
|  | 1 | +import pytest | 
|  | 2 | +import requests | 
|  | 3 | +import responses | 
|  | 4 | +from requests import Timeout | 
|  | 5 | + | 
|  | 6 | +from cyclient.auth_client import AuthClient | 
|  | 7 | +from cyclient.models import AuthenticationSession, ApiTokenGenerationPollingResponse, \ | 
|  | 8 | +    ApiTokenGenerationPollingResponseSchema | 
|  | 9 | +from cli.exceptions.custom_exceptions import CycodeError | 
|  | 10 | + | 
|  | 11 | + | 
|  | 12 | +@pytest.fixture(scope='module') | 
|  | 13 | +def code_challenge() -> str: | 
|  | 14 | +    from cli.auth.auth_manager import AuthManager | 
|  | 15 | +    code_challenge, _ = AuthManager()._generate_pkce_code_pair() | 
|  | 16 | +    return code_challenge | 
|  | 17 | + | 
|  | 18 | + | 
|  | 19 | +@pytest.fixture(scope='module') | 
|  | 20 | +def code_verifier() -> str: | 
|  | 21 | +    from cli.auth.auth_manager import AuthManager | 
|  | 22 | +    _, code_verifier = AuthManager()._generate_pkce_code_pair() | 
|  | 23 | +    return code_verifier | 
|  | 24 | + | 
|  | 25 | + | 
|  | 26 | +@pytest.fixture(scope='module', name='client') | 
|  | 27 | +def auth_client() -> AuthClient: | 
|  | 28 | +    return AuthClient() | 
|  | 29 | + | 
|  | 30 | + | 
|  | 31 | +@pytest.fixture(scope='module', name='start_url') | 
|  | 32 | +def auth_start_url(client: AuthClient) -> str: | 
|  | 33 | +    # TODO(MarshalX): create database of constants of endpoints. remove hardcoded paths | 
|  | 34 | +    return client.cycode_client.build_full_url( | 
|  | 35 | +        client.cycode_client.api_url, | 
|  | 36 | +        f'{client.AUTH_CONTROLLER_PATH}/start' | 
|  | 37 | +    ) | 
|  | 38 | + | 
|  | 39 | + | 
|  | 40 | +@pytest.fixture(scope='module', name='token_url') | 
|  | 41 | +def auth_token_url(client: AuthClient) -> str: | 
|  | 42 | +    return client.cycode_client.build_full_url( | 
|  | 43 | +        client.cycode_client.api_url, | 
|  | 44 | +        f'{client.AUTH_CONTROLLER_PATH}/token' | 
|  | 45 | +    ) | 
|  | 46 | + | 
|  | 47 | + | 
|  | 48 | +_SESSION_ID = '4cff1234-a209-47ed-ab2f-85676912345c' | 
|  | 49 | + | 
|  | 50 | + | 
|  | 51 | +@responses.activate | 
|  | 52 | +def test_start_session_success(client: AuthClient, start_url: str, code_challenge: str): | 
|  | 53 | +    responses.add( | 
|  | 54 | +        responses.POST, | 
|  | 55 | +        start_url, | 
|  | 56 | +        json={'session_id': _SESSION_ID}, | 
|  | 57 | +        status=200, | 
|  | 58 | +    ) | 
|  | 59 | + | 
|  | 60 | +    session_response = client.start_session(code_challenge) | 
|  | 61 | +    assert isinstance(session_response, AuthenticationSession) | 
|  | 62 | +    assert session_response.session_id == _SESSION_ID | 
|  | 63 | + | 
|  | 64 | + | 
|  | 65 | +@responses.activate | 
|  | 66 | +def test_start_session_timeout(client: AuthClient, start_url: str, code_challenge: str): | 
|  | 67 | +    responses.add(responses.POST, start_url, status=504) | 
|  | 68 | + | 
|  | 69 | +    timeout_response = requests.post(start_url, timeout=5) | 
|  | 70 | +    if timeout_response.status_code == 504: | 
|  | 71 | +        """bypass SAST""" | 
|  | 72 | + | 
|  | 73 | +    responses.reset() | 
|  | 74 | + | 
|  | 75 | +    timeout_error = Timeout() | 
|  | 76 | +    timeout_error.response = timeout_response | 
|  | 77 | + | 
|  | 78 | +    responses.add(responses.POST, start_url, body=timeout_error) | 
|  | 79 | + | 
|  | 80 | +    with pytest.raises(CycodeError) as e_info: | 
|  | 81 | +        client.start_session(code_challenge) | 
|  | 82 | + | 
|  | 83 | +    assert e_info.value.status_code == 504 | 
|  | 84 | + | 
|  | 85 | + | 
|  | 86 | +@responses.activate | 
|  | 87 | +def test_start_session_http_error(client: AuthClient, start_url: str, code_challenge: str): | 
|  | 88 | +    responses.add(responses.POST, start_url, status=401) | 
|  | 89 | + | 
|  | 90 | +    with pytest.raises(CycodeError) as e_info: | 
|  | 91 | +        client.start_session(code_challenge) | 
|  | 92 | + | 
|  | 93 | +    assert e_info.value.status_code == 401 | 
|  | 94 | + | 
|  | 95 | + | 
|  | 96 | +@responses.activate | 
|  | 97 | +def test_get_api_token_success_pending(client: AuthClient, token_url: str, code_verifier: str): | 
|  | 98 | +    expected_status = 'Pending' | 
|  | 99 | +    expected_api_token = None | 
|  | 100 | + | 
|  | 101 | +    responses.add( | 
|  | 102 | +        responses.POST, | 
|  | 103 | +        token_url, | 
|  | 104 | +        json={'status': expected_status, 'api_token': expected_api_token}, | 
|  | 105 | +        status=200, | 
|  | 106 | +    ) | 
|  | 107 | + | 
|  | 108 | +    api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier) | 
|  | 109 | +    assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse) | 
|  | 110 | +    assert api_token_polling_response.status == expected_status | 
|  | 111 | +    assert api_token_polling_response.api_token == expected_api_token | 
|  | 112 | + | 
|  | 113 | + | 
|  | 114 | +@responses.activate | 
|  | 115 | +def test_get_api_token_success_completed(client: AuthClient, token_url: str, code_verifier: str): | 
|  | 116 | +    expected_status = 'Completed' | 
|  | 117 | +    expected_json = { | 
|  | 118 | +        'status': expected_status, | 
|  | 119 | +        'api_token': { | 
|  | 120 | +            'clientId': 'b123458-0eaa-4010-beb4-6f0c54612345', | 
|  | 121 | +            'secret': 'a123450a-42b2-4ad5-8bdd-c0130123456', | 
|  | 122 | +            'description': 'cycode cli api token', | 
|  | 123 | +            'createdByUserId': None, | 
|  | 124 | +            'createdAt': '2023-04-26T11:38:54+00:00' | 
|  | 125 | +        } | 
|  | 126 | +    } | 
|  | 127 | +    expected_response = ApiTokenGenerationPollingResponseSchema().load(expected_json) | 
|  | 128 | + | 
|  | 129 | +    responses.add( | 
|  | 130 | +        responses.POST, | 
|  | 131 | +        token_url, | 
|  | 132 | +        json=expected_json, | 
|  | 133 | +        status=200, | 
|  | 134 | +    ) | 
|  | 135 | + | 
|  | 136 | +    api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier) | 
|  | 137 | +    assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse) | 
|  | 138 | +    assert api_token_polling_response.status == expected_status | 
|  | 139 | +    assert api_token_polling_response.api_token.client_id == expected_response.api_token.client_id | 
|  | 140 | +    assert api_token_polling_response.api_token.secret == expected_response.api_token.secret | 
|  | 141 | +    assert api_token_polling_response.api_token.description == expected_response.api_token.description | 
|  | 142 | + | 
|  | 143 | + | 
|  | 144 | +@responses.activate | 
|  | 145 | +def test_get_api_token_http_error_valid_response(client: AuthClient, token_url: str, code_verifier: str): | 
|  | 146 | +    # TODO(MarshalX): ask Michal about such cases or dive into code of platform | 
|  | 147 | +    expected_status = 'Pending' | 
|  | 148 | +    expected_api_token = None | 
|  | 149 | + | 
|  | 150 | +    responses.add( | 
|  | 151 | +        responses.POST, | 
|  | 152 | +        token_url, | 
|  | 153 | +        json={'status': expected_status, 'api_token': expected_api_token}, | 
|  | 154 | +        status=418,  # any code between 400 and 600 | 
|  | 155 | +    ) | 
|  | 156 | + | 
|  | 157 | +    api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier) | 
|  | 158 | +    assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse) | 
|  | 159 | +    assert api_token_polling_response.status == expected_status | 
|  | 160 | +    assert api_token_polling_response.api_token == expected_api_token | 
|  | 161 | + | 
|  | 162 | + | 
|  | 163 | +@responses.activate | 
|  | 164 | +def test_get_api_token_http_error_invalid_response(client: AuthClient, token_url: str, code_verifier: str): | 
|  | 165 | +    responses.add( | 
|  | 166 | +        responses.POST, | 
|  | 167 | +        token_url, | 
|  | 168 | +        body='Invalid body', | 
|  | 169 | +        status=418,  # any code between 400 and 600 | 
|  | 170 | +    ) | 
|  | 171 | + | 
|  | 172 | +    api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier) | 
|  | 173 | +    assert api_token_polling_response is None | 
|  | 174 | + | 
|  | 175 | + | 
|  | 176 | +@responses.activate | 
|  | 177 | +def test_get_api_token_not_excepted_exception(client: AuthClient, token_url: str, code_verifier: str): | 
|  | 178 | +    responses.add(responses.POST, token_url, body=Timeout()) | 
|  | 179 | + | 
|  | 180 | +    api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier) | 
|  | 181 | +    assert api_token_polling_response is None | 
0 commit comments