Skip to content
Merged
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
omit =
# ignore all test cases in tests/
tests/*
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
__pycache__
/build/
/*.egg-info/
/dist/
/dist/

# coverage
.coverage
htmlcov
coverage.*
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ gitpython==3.1.30
mock==4.0.3
pytest==6.2.5
pytest-mock==3.6.1
coverage==7.2.3
responses==0.23.1
binaryornot==0.4.4
halo==0.0.31
texttable==1.6.7
Empty file added tests/cli/__init__.py
Empty file.
47 changes: 47 additions & 0 deletions tests/cli/test_cycode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json

import pytest
from typing import TYPE_CHECKING

import responses
from click.testing import CliRunner

from cli.cycode import main_cli
from tests.conftest import TEST_FILES_PATH, CLI_ENV_VARS
from tests.cyclient.test_scan_client import get_zipped_file_scan_response, get_zipped_file_scan_url

_PATH_TO_SCAN = TEST_FILES_PATH.joinpath('zip_content').absolute()

if TYPE_CHECKING:
from cyclient.scan_client import ScanClient


def _is_json(plain: str) -> bool:
try:
json.loads(plain)
return True
except (ValueError, TypeError):
return False


@responses.activate
@pytest.mark.parametrize('output', ['text', 'json'])
def test_passing_output_option_to_scan(output: str, scan_client: 'ScanClient', api_token_response: responses.Response):
scan_type = 'secret'

responses.add(get_zipped_file_scan_response(get_zipped_file_scan_url(scan_type, scan_client)))
responses.add(api_token_response)
# scan report is not mocked. This raise connection error on attempt to report scan. it doesn't perform real request

args = ['scan', '--soft-fail', '--output', output, 'path', str(_PATH_TO_SCAN)]
result = CliRunner().invoke(main_cli, args, env=CLI_ENV_VARS)

except_json = output == 'json'

assert _is_json(result.output) == except_json

if except_json:
output = json.loads(result.output)
assert 'scan_id' in output
else:
assert 'Scan Results' in result.output
56 changes: 56 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from pathlib import Path

import pytest
import responses

from cyclient.cycode_token_based_client import CycodeTokenBasedClient
from cyclient.scan_client import ScanClient
from cyclient.scan_config.scan_config_creator import create_scan_client

_EXPECTED_API_TOKEN = 'someJWT'

_CLIENT_ID = 'b1234568-0eaa-1234-beb8-6f0c12345678'
_CLIENT_SECRET = 'a12345a-42b2-1234-3bdd-c0130123456'

CLI_ENV_VARS = {
'CYCODE_CLIENT_ID': _CLIENT_ID,
'CYCODE_CLIENT_SECRET': _CLIENT_SECRET
}

TEST_FILES_PATH = Path(__file__).parent.joinpath('test_files').absolute()


@pytest.fixture(scope='session')
def scan_client() -> ScanClient:
return create_scan_client(_CLIENT_ID, _CLIENT_SECRET)


@pytest.fixture(scope='session')
def token_based_client() -> CycodeTokenBasedClient:
return CycodeTokenBasedClient(_CLIENT_ID, _CLIENT_SECRET)


@pytest.fixture(scope='session')
def api_token_url(token_based_client: CycodeTokenBasedClient) -> str:
return f'{token_based_client.api_url}/api/v1/auth/api-token'


@pytest.fixture(scope='session')
def api_token_response(api_token_url) -> responses.Response:
return responses.Response(
method=responses.POST,
url=api_token_url,
json={
'token': _EXPECTED_API_TOKEN,
'refresh_token': '12345678-0c68-1234-91ba-a13123456789',
'expires_in': 86400
},
status=200
)


@pytest.fixture(scope='session')
@responses.activate
def api_token(token_based_client: CycodeTokenBasedClient, api_token_response: responses.Response) -> str:
responses.add(api_token_response)
return token_based_client.api_token
Empty file.
22 changes: 22 additions & 0 deletions tests/cyclient/scan_config/test_default_scan_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from cyclient.scan_config.scan_config_creator import DefaultScanConfig


def test_get_service_name():
default_scan_config = DefaultScanConfig()

assert default_scan_config.get_service_name('secret') == 'secret'
assert default_scan_config.get_service_name('iac') == 'iac'
assert default_scan_config.get_service_name('sca') == 'scans'
assert default_scan_config.get_service_name('sast') == 'scans'


def test_get_scans_prefix():
default_scan_config = DefaultScanConfig()

assert default_scan_config.get_scans_prefix() == 'scans'


def test_get_detections_prefix():
default_scan_config = DefaultScanConfig()

assert default_scan_config.get_detections_prefix() == 'detections'
22 changes: 22 additions & 0 deletions tests/cyclient/scan_config/test_dev_scan_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from cyclient.scan_config.scan_config_creator import DevScanConfig


def test_get_service_name():
dev_scan_config = DevScanConfig()

assert dev_scan_config.get_service_name('secret') == '5025'
assert dev_scan_config.get_service_name('iac') == '5026'
assert dev_scan_config.get_service_name('sca') == '5004'
assert dev_scan_config.get_service_name('sast') == '5004'


def test_get_scans_prefix():
dev_scan_config = DevScanConfig()

assert dev_scan_config.get_scans_prefix() == '5004'


def test_get_detections_prefix():
dev_scan_config = DevScanConfig()

assert dev_scan_config.get_detections_prefix() == '5016'
181 changes: 181 additions & 0 deletions tests/cyclient/test_auth_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import pytest
import requests
import responses
from requests import Timeout

from cyclient.auth_client import AuthClient
from cyclient.models import AuthenticationSession, ApiTokenGenerationPollingResponse, \
ApiTokenGenerationPollingResponseSchema
from cli.exceptions.custom_exceptions import CycodeError


@pytest.fixture(scope='module')
def code_challenge() -> str:
from cli.auth.auth_manager import AuthManager
code_challenge, _ = AuthManager()._generate_pkce_code_pair()
return code_challenge


@pytest.fixture(scope='module')
def code_verifier() -> str:
from cli.auth.auth_manager import AuthManager
_, code_verifier = AuthManager()._generate_pkce_code_pair()
return code_verifier


@pytest.fixture(scope='module', name='client')
def auth_client() -> AuthClient:
return AuthClient()


@pytest.fixture(scope='module', name='start_url')
def auth_start_url(client: AuthClient) -> str:
# TODO(MarshalX): create database of constants of endpoints. remove hardcoded paths
return client.cycode_client.build_full_url(
client.cycode_client.api_url,
f'{client.AUTH_CONTROLLER_PATH}/start'
)


@pytest.fixture(scope='module', name='token_url')
def auth_token_url(client: AuthClient) -> str:
return client.cycode_client.build_full_url(
client.cycode_client.api_url,
f'{client.AUTH_CONTROLLER_PATH}/token'
)


_SESSION_ID = '4cff1234-a209-47ed-ab2f-85676912345c'


@responses.activate
def test_start_session_success(client: AuthClient, start_url: str, code_challenge: str):
responses.add(
responses.POST,
start_url,
json={'session_id': _SESSION_ID},
status=200,
)

session_response = client.start_session(code_challenge)
assert isinstance(session_response, AuthenticationSession)
assert session_response.session_id == _SESSION_ID


@responses.activate
def test_start_session_timeout(client: AuthClient, start_url: str, code_challenge: str):
responses.add(responses.POST, start_url, status=504)

timeout_response = requests.post(start_url, timeout=5)
if timeout_response.status_code == 504:
"""bypass SAST"""

responses.reset()

timeout_error = Timeout()
timeout_error.response = timeout_response

responses.add(responses.POST, start_url, body=timeout_error)

with pytest.raises(CycodeError) as e_info:
client.start_session(code_challenge)

assert e_info.value.status_code == 504


@responses.activate
def test_start_session_http_error(client: AuthClient, start_url: str, code_challenge: str):
responses.add(responses.POST, start_url, status=401)

with pytest.raises(CycodeError) as e_info:
client.start_session(code_challenge)

assert e_info.value.status_code == 401


@responses.activate
def test_get_api_token_success_pending(client: AuthClient, token_url: str, code_verifier: str):
expected_status = 'Pending'
expected_api_token = None

responses.add(
responses.POST,
token_url,
json={'status': expected_status, 'api_token': expected_api_token},
status=200,
)

api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier)
assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse)
assert api_token_polling_response.status == expected_status
assert api_token_polling_response.api_token == expected_api_token


@responses.activate
def test_get_api_token_success_completed(client: AuthClient, token_url: str, code_verifier: str):
expected_status = 'Completed'
expected_json = {
'status': expected_status,
'api_token': {
'clientId': 'b123458-0eaa-4010-beb4-6f0c54612345',
'secret': 'a123450a-42b2-4ad5-8bdd-c0130123456',
'description': 'cycode cli api token',
'createdByUserId': None,
'createdAt': '2023-04-26T11:38:54+00:00'
}
}
expected_response = ApiTokenGenerationPollingResponseSchema().load(expected_json)

responses.add(
responses.POST,
token_url,
json=expected_json,
status=200,
)

api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier)
assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse)
assert api_token_polling_response.status == expected_status
assert api_token_polling_response.api_token.client_id == expected_response.api_token.client_id
assert api_token_polling_response.api_token.secret == expected_response.api_token.secret
assert api_token_polling_response.api_token.description == expected_response.api_token.description


@responses.activate
def test_get_api_token_http_error_valid_response(client: AuthClient, token_url: str, code_verifier: str):
# TODO(MarshalX): ask Michal about such cases or dive into code of platform
expected_status = 'Pending'
expected_api_token = None

responses.add(
responses.POST,
token_url,
json={'status': expected_status, 'api_token': expected_api_token},
status=418, # any code between 400 and 600
)

api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier)
assert isinstance(api_token_polling_response, ApiTokenGenerationPollingResponse)
assert api_token_polling_response.status == expected_status
assert api_token_polling_response.api_token == expected_api_token


@responses.activate
def test_get_api_token_http_error_invalid_response(client: AuthClient, token_url: str, code_verifier: str):
responses.add(
responses.POST,
token_url,
body='Invalid body',
status=418, # any code between 400 and 600
)

api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier)
assert api_token_polling_response is None


@responses.activate
def test_get_api_token_not_excepted_exception(client: AuthClient, token_url: str, code_verifier: str):
responses.add(responses.POST, token_url, body=Timeout())

api_token_polling_response = client.get_api_token(_SESSION_ID, code_verifier)
assert api_token_polling_response is None
9 changes: 9 additions & 0 deletions tests/cyclient/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from cyclient import config
from cyclient.cycode_client import CycodeClient


def test_init_values_from_config():
client = CycodeClient()

assert client.api_url == config.cycode_api_url
assert client.timeout == config.timeout
Loading