Skip to content

Commit 44df1d6

Browse files
authored
CM-26891 - Add cycode status command (#268)
1 parent 56a773b commit 44df1d6

File tree

7 files changed

+211
-33
lines changed

7 files changed

+211
-33
lines changed

cycode/cli/commands/auth/auth_command.py

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
import click
22

33
from cycode.cli.commands.auth.auth_manager import AuthManager
4+
from cycode.cli.commands.auth_common import get_authorization_info
45
from cycode.cli.exceptions.custom_exceptions import (
56
KNOWN_USER_FRIENDLY_REQUEST_ERRORS,
67
AuthProcessError,
7-
HttpUnauthorizedError,
8-
RequestHttpError,
98
)
109
from cycode.cli.models import CliError, CliErrors, CliResult
1110
from cycode.cli.printers import ConsolePrinter
1211
from cycode.cli.sentry import add_breadcrumb, capture_exception
13-
from cycode.cli.user_settings.credentials_manager import CredentialsManager
14-
from cycode.cli.utils.jwt_utils import get_user_and_tenant_ids_from_access_token
1512
from cycode.cyclient import logger
16-
from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient
1713

1814

1915
@click.group(
@@ -49,35 +45,18 @@ def authorization_check(context: click.Context) -> None:
4945
add_breadcrumb('check')
5046

5147
printer = ConsolePrinter(context)
52-
53-
failed_auth_check_res = CliResult(success=False, message='Cycode authentication failed')
54-
55-
client_id, client_secret = CredentialsManager().get_credentials()
56-
if not client_id or not client_secret:
57-
printer.print_result(failed_auth_check_res)
48+
auth_info = get_authorization_info(context)
49+
if auth_info is None:
50+
printer.print_result(CliResult(success=False, message='Cycode authentication failed'))
5851
return
5952

60-
try:
61-
access_token = CycodeTokenBasedClient(client_id, client_secret).get_access_token()
62-
if not access_token:
63-
printer.print_result(failed_auth_check_res)
64-
return
65-
66-
user_id, tenant_id = get_user_and_tenant_ids_from_access_token(access_token)
67-
printer.print_result(
68-
CliResult(
69-
success=True,
70-
message='Cycode authentication verified',
71-
data={'user_id': user_id, 'tenant_id': tenant_id},
72-
)
53+
printer.print_result(
54+
CliResult(
55+
success=True,
56+
message='Cycode authentication verified',
57+
data={'user_id': auth_info.user_id, 'tenant_id': auth_info.tenant_id},
7358
)
74-
75-
return
76-
except (RequestHttpError, HttpUnauthorizedError):
77-
ConsolePrinter(context).print_exception()
78-
79-
printer.print_result(failed_auth_check_res)
80-
return
59+
)
8160

8261

8362
def _handle_exception(context: click.Context, e: Exception) -> None:

cycode/cli/commands/auth_common.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from typing import NamedTuple, Optional
2+
3+
import click
4+
5+
from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError, RequestHttpError
6+
from cycode.cli.printers import ConsolePrinter
7+
from cycode.cli.user_settings.credentials_manager import CredentialsManager
8+
from cycode.cli.utils.jwt_utils import get_user_and_tenant_ids_from_access_token
9+
from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient
10+
11+
12+
class AuthInfo(NamedTuple):
13+
user_id: str
14+
tenant_id: str
15+
16+
17+
def get_authorization_info(context: Optional[click.Context] = None) -> Optional[AuthInfo]:
18+
client_id, client_secret = CredentialsManager().get_credentials()
19+
if not client_id or not client_secret:
20+
return None
21+
22+
try:
23+
access_token = CycodeTokenBasedClient(client_id, client_secret).get_access_token()
24+
if not access_token:
25+
return None
26+
27+
user_id, tenant_id = get_user_and_tenant_ids_from_access_token(access_token)
28+
return AuthInfo(user_id=user_id, tenant_id=tenant_id)
29+
except (RequestHttpError, HttpUnauthorizedError):
30+
if context:
31+
ConsolePrinter(context).print_exception()
32+
33+
return None

cycode/cli/commands/main_cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from cycode.cli.commands.ignore.ignore_command import ignore_command
99
from cycode.cli.commands.report.report_command import report_command
1010
from cycode.cli.commands.scan.scan_command import scan_command
11+
from cycode.cli.commands.status.status_command import status_command
1112
from cycode.cli.commands.version.version_command import version_command
1213
from cycode.cli.consts import (
1314
CLI_CONTEXT_SETTINGS,
@@ -28,6 +29,7 @@
2829
'ignore': ignore_command,
2930
'auth': auth_command,
3031
'version': version_command,
32+
'status': status_command,
3133
},
3234
context_settings=CLI_CONTEXT_SETTINGS,
3335
)

cycode/cli/commands/status/__init__.py

Whitespace-only changes.
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import dataclasses
2+
import json
3+
import platform
4+
from typing import Dict
5+
6+
import click
7+
8+
from cycode import __version__
9+
from cycode.cli.commands.auth_common import get_authorization_info
10+
from cycode.cli.consts import PROGRAM_NAME
11+
from cycode.cli.user_settings.configuration_manager import ConfigurationManager
12+
from cycode.cli.utils.get_api_client import get_scan_cycode_client
13+
from cycode.cyclient import logger
14+
15+
16+
class CliStatusBase:
17+
def as_dict(self) -> Dict[str, any]:
18+
return dataclasses.asdict(self)
19+
20+
def _get_text_message_part(self, key: str, value: any, intent: int = 0) -> str:
21+
message_parts = []
22+
23+
intent_prefix = ' ' * intent * 2
24+
human_readable_key = key.replace('_', ' ').capitalize()
25+
26+
if isinstance(value, dict):
27+
message_parts.append(f'{intent_prefix}{human_readable_key}:')
28+
for sub_key, sub_value in value.items():
29+
message_parts.append(self._get_text_message_part(sub_key, sub_value, intent=intent + 1))
30+
elif isinstance(value, (list, set, tuple)):
31+
message_parts.append(f'{intent_prefix}{human_readable_key}:')
32+
for index, sub_value in enumerate(value):
33+
message_parts.append(self._get_text_message_part(f'#{index}', sub_value, intent=intent + 1))
34+
else:
35+
message_parts.append(f'{intent_prefix}{human_readable_key}: {value}')
36+
37+
return '\n'.join(message_parts)
38+
39+
def as_text(self) -> str:
40+
message_parts = []
41+
for key, value in self.as_dict().items():
42+
message_parts.append(self._get_text_message_part(key, value))
43+
44+
return '\n'.join(message_parts)
45+
46+
def as_json(self) -> str:
47+
return json.dumps(self.as_dict())
48+
49+
50+
@dataclasses.dataclass
51+
class CliSupportedModulesStatus(CliStatusBase):
52+
secret_scanning: bool = False
53+
sca_scanning: bool = False
54+
iac_scanning: bool = False
55+
sast_scanning: bool = False
56+
ai_large_language_model: bool = False
57+
58+
59+
@dataclasses.dataclass
60+
class CliStatus(CliStatusBase):
61+
program: str
62+
version: str
63+
os: str
64+
arch: str
65+
python_version: str
66+
installation_id: str
67+
app_url: str
68+
api_url: str
69+
is_authenticated: bool
70+
user_id: str = None
71+
tenant_id: str = None
72+
supported_modules: CliSupportedModulesStatus = None
73+
74+
75+
def get_cli_status() -> CliStatus:
76+
configuration_manager = ConfigurationManager()
77+
78+
auth_info = get_authorization_info()
79+
is_authenticated = auth_info is not None
80+
81+
supported_modules_status = CliSupportedModulesStatus()
82+
if is_authenticated:
83+
try:
84+
client = get_scan_cycode_client()
85+
supported_modules_preferences = client.get_supported_modules_preferences()
86+
87+
supported_modules_status.secret_scanning = supported_modules_preferences.secret_scanning
88+
supported_modules_status.sca_scanning = supported_modules_preferences.sca_scanning
89+
supported_modules_status.iac_scanning = supported_modules_preferences.iac_scanning
90+
supported_modules_status.sast_scanning = supported_modules_preferences.sast_scanning
91+
supported_modules_status.ai_large_language_model = supported_modules_preferences.ai_large_language_model
92+
except Exception as e:
93+
logger.debug('Failed to get supported modules preferences', exc_info=e)
94+
95+
return CliStatus(
96+
program=PROGRAM_NAME,
97+
version=__version__,
98+
os=platform.system(),
99+
arch=platform.machine(),
100+
python_version=platform.python_version(),
101+
installation_id=configuration_manager.get_or_create_installation_id(),
102+
app_url=configuration_manager.get_cycode_app_url(),
103+
api_url=configuration_manager.get_cycode_api_url(),
104+
is_authenticated=is_authenticated,
105+
user_id=auth_info.user_id if auth_info else None,
106+
tenant_id=auth_info.tenant_id if auth_info else None,
107+
supported_modules=supported_modules_status,
108+
)
109+
110+
111+
@click.command(short_help='Show the CLI status and exit.')
112+
@click.pass_context
113+
def status_command(context: click.Context) -> None:
114+
output = context.obj['output']
115+
116+
status = get_cli_status()
117+
message = status.as_text()
118+
if output == 'json':
119+
message = status.as_json()
120+
121+
click.echo(message, color=context.color)
122+
context.exit()

cycode/cyclient/models.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,3 +478,41 @@ class Meta:
478478
@post_load
479479
def build_dto(self, data: Dict[str, Any], **_) -> ScanResultsSyncFlow:
480480
return ScanResultsSyncFlow(**data)
481+
482+
483+
@dataclass
484+
class SupportedModulesPreferences:
485+
secret_scanning: bool
486+
leak_scanning: bool
487+
iac_scanning: bool
488+
sca_scanning: bool
489+
ci_cd_scanning: bool
490+
sast_scanning: bool
491+
container_scanning: bool
492+
access_review: bool
493+
asoc: bool
494+
cimon: bool
495+
ai_machine_learning: bool
496+
ai_large_language_model: bool
497+
498+
499+
class SupportedModulesPreferencesSchema(Schema):
500+
class Meta:
501+
unknown = EXCLUDE
502+
503+
secret_scanning = fields.Boolean()
504+
leak_scanning = fields.Boolean()
505+
iac_scanning = fields.Boolean()
506+
sca_scanning = fields.Boolean()
507+
ci_cd_scanning = fields.Boolean()
508+
sast_scanning = fields.Boolean()
509+
container_scanning = fields.Boolean()
510+
access_review = fields.Boolean()
511+
asoc = fields.Boolean()
512+
cimon = fields.Boolean()
513+
ai_machine_learning = fields.Boolean()
514+
ai_large_language_model = fields.Boolean()
515+
516+
@post_load
517+
def build_dto(self, data: Dict[str, Any], **_) -> 'SupportedModulesPreferences':
518+
return SupportedModulesPreferences(**data)

cycode/cyclient/scan_client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(
2727
self._DETECTIONS_SERVICE_CONTROLLER_PATH = 'api/v1/detections'
2828
self._DETECTIONS_SERVICE_CLI_CONTROLLER_PATH = 'api/v1/detections/cli'
2929

30-
self.POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'
30+
self._POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'
3131

3232
self._hide_response_log = hide_response_log
3333

@@ -198,10 +198,14 @@ def get_scan_details(self, scan_type: str, scan_id: str) -> models.ScanDetailsRe
198198
def get_detection_rules_path(self) -> str:
199199
return (
200200
f'{self.scan_config.get_detections_prefix()}/'
201-
f'{self.POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
201+
f'{self._POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
202202
f'detection_rules/byIds'
203203
)
204204

205+
def get_supported_modules_preferences(self) -> models.SupportedModulesPreferences:
206+
response = self.scan_cycode_client.get(url_path='preferences/api/v1/supportedmodules')
207+
return models.SupportedModulesPreferencesSchema().load(response.json())
208+
205209
@staticmethod
206210
def _get_policy_type_by_scan_type(scan_type: str) -> str:
207211
scan_type_to_policy_type = {

0 commit comments

Comments
 (0)