Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 10 additions & 31 deletions cycode/cli/commands/auth/auth_command.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
import click

from cycode.cli.commands.auth.auth_manager import AuthManager
from cycode.cli.commands.auth_common import get_authorization_info
from cycode.cli.exceptions.custom_exceptions import (
KNOWN_USER_FRIENDLY_REQUEST_ERRORS,
AuthProcessError,
HttpUnauthorizedError,
RequestHttpError,
)
from cycode.cli.models import CliError, CliErrors, CliResult
from cycode.cli.printers import ConsolePrinter
from cycode.cli.sentry import add_breadcrumb, capture_exception
from cycode.cli.user_settings.credentials_manager import CredentialsManager
from cycode.cli.utils.jwt_utils import get_user_and_tenant_ids_from_access_token
from cycode.cyclient import logger
from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient


@click.group(
Expand Down Expand Up @@ -49,35 +45,18 @@ def authorization_check(context: click.Context) -> None:
add_breadcrumb('check')

printer = ConsolePrinter(context)

failed_auth_check_res = CliResult(success=False, message='Cycode authentication failed')

client_id, client_secret = CredentialsManager().get_credentials()
if not client_id or not client_secret:
printer.print_result(failed_auth_check_res)
auth_info = get_authorization_info(context)
if auth_info is None:
printer.print_result(CliResult(success=False, message='Cycode authentication failed'))
return

try:
access_token = CycodeTokenBasedClient(client_id, client_secret).get_access_token()
if not access_token:
printer.print_result(failed_auth_check_res)
return

user_id, tenant_id = get_user_and_tenant_ids_from_access_token(access_token)
printer.print_result(
CliResult(
success=True,
message='Cycode authentication verified',
data={'user_id': user_id, 'tenant_id': tenant_id},
)
printer.print_result(
CliResult(
success=True,
message='Cycode authentication verified',
data={'user_id': auth_info.user_id, 'tenant_id': auth_info.tenant_id},
)

return
except (RequestHttpError, HttpUnauthorizedError):
ConsolePrinter(context).print_exception()

printer.print_result(failed_auth_check_res)
return
)


def _handle_exception(context: click.Context, e: Exception) -> None:
Expand Down
33 changes: 33 additions & 0 deletions cycode/cli/commands/auth_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from typing import NamedTuple, Optional

import click

from cycode.cli.exceptions.custom_exceptions import HttpUnauthorizedError, RequestHttpError
from cycode.cli.printers import ConsolePrinter
from cycode.cli.user_settings.credentials_manager import CredentialsManager
from cycode.cli.utils.jwt_utils import get_user_and_tenant_ids_from_access_token
from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient


class AuthInfo(NamedTuple):
user_id: str
tenant_id: str


def get_authorization_info(context: Optional[click.Context] = None) -> Optional[AuthInfo]:
client_id, client_secret = CredentialsManager().get_credentials()
if not client_id or not client_secret:
return None

try:
access_token = CycodeTokenBasedClient(client_id, client_secret).get_access_token()
if not access_token:
return None

user_id, tenant_id = get_user_and_tenant_ids_from_access_token(access_token)
return AuthInfo(user_id=user_id, tenant_id=tenant_id)
except (RequestHttpError, HttpUnauthorizedError):
if context:
ConsolePrinter(context).print_exception()

return None
2 changes: 2 additions & 0 deletions cycode/cli/commands/main_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cycode.cli.commands.ignore.ignore_command import ignore_command
from cycode.cli.commands.report.report_command import report_command
from cycode.cli.commands.scan.scan_command import scan_command
from cycode.cli.commands.status.status_command import status_command
from cycode.cli.commands.version.version_command import version_command
from cycode.cli.consts import (
CLI_CONTEXT_SETTINGS,
Expand All @@ -28,6 +29,7 @@
'ignore': ignore_command,
'auth': auth_command,
'version': version_command,
'status': status_command,
},
context_settings=CLI_CONTEXT_SETTINGS,
)
Expand Down
Empty file.
122 changes: 122 additions & 0 deletions cycode/cli/commands/status/status_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import dataclasses
import json
import platform
from typing import Dict

import click

from cycode import __version__
from cycode.cli.commands.auth_common import get_authorization_info
from cycode.cli.consts import PROGRAM_NAME
from cycode.cli.user_settings.configuration_manager import ConfigurationManager
from cycode.cli.utils.get_api_client import get_scan_cycode_client
from cycode.cyclient import logger


class CliStatusBase:
def as_dict(self) -> Dict[str, any]:
return dataclasses.asdict(self)

def _get_text_message_part(self, key: str, value: any, intent: int = 0) -> str:
message_parts = []

intent_prefix = ' ' * intent * 2
human_readable_key = key.replace('_', ' ').capitalize()

if isinstance(value, dict):
message_parts.append(f'{intent_prefix}{human_readable_key}:')
for sub_key, sub_value in value.items():
message_parts.append(self._get_text_message_part(sub_key, sub_value, intent=intent + 1))
elif isinstance(value, (list, set, tuple)):
message_parts.append(f'{intent_prefix}{human_readable_key}:')
for index, sub_value in enumerate(value):
message_parts.append(self._get_text_message_part(f'#{index}', sub_value, intent=intent + 1))
else:
message_parts.append(f'{intent_prefix}{human_readable_key}: {value}')

return '\n'.join(message_parts)

def as_text(self) -> str:
message_parts = []
for key, value in self.as_dict().items():
message_parts.append(self._get_text_message_part(key, value))

return '\n'.join(message_parts)

def as_json(self) -> str:
return json.dumps(self.as_dict())


@dataclasses.dataclass
class CliSupportedModulesStatus(CliStatusBase):
secret_scanning: bool = False
sca_scanning: bool = False
iac_scanning: bool = False
sast_scanning: bool = False
ai_large_language_model: bool = False


@dataclasses.dataclass
class CliStatus(CliStatusBase):
program: str
version: str
os: str
arch: str
python_version: str
installation_id: str
app_url: str
api_url: str
is_authenticated: bool
user_id: str = None
tenant_id: str = None
supported_modules: CliSupportedModulesStatus = None


def get_cli_status() -> CliStatus:
configuration_manager = ConfigurationManager()

auth_info = get_authorization_info()
is_authenticated = auth_info is not None

supported_modules_status = CliSupportedModulesStatus()
if is_authenticated:
try:
client = get_scan_cycode_client()
supported_modules_preferences = client.get_supported_modules_preferences()

supported_modules_status.secret_scanning = supported_modules_preferences.secret_scanning
supported_modules_status.sca_scanning = supported_modules_preferences.sca_scanning
supported_modules_status.iac_scanning = supported_modules_preferences.iac_scanning
supported_modules_status.sast_scanning = supported_modules_preferences.sast_scanning
supported_modules_status.ai_large_language_model = supported_modules_preferences.ai_large_language_model
except Exception as e:
logger.debug('Failed to get supported modules preferences', exc_info=e)

return CliStatus(
program=PROGRAM_NAME,
version=__version__,
os=platform.system(),
arch=platform.machine(),
python_version=platform.python_version(),
installation_id=configuration_manager.get_or_create_installation_id(),
app_url=configuration_manager.get_cycode_app_url(),
api_url=configuration_manager.get_cycode_api_url(),
is_authenticated=is_authenticated,
user_id=auth_info.user_id if auth_info else None,
tenant_id=auth_info.tenant_id if auth_info else None,
supported_modules=supported_modules_status,
)


@click.command(short_help='Show the CLI status and exit.')
@click.pass_context
def status_command(context: click.Context) -> None:
output = context.obj['output']

status = get_cli_status()
message = status.as_text()
if output == 'json':
message = status.as_json()

click.echo(message, color=context.color)
context.exit()
38 changes: 38 additions & 0 deletions cycode/cyclient/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,3 +478,41 @@ class Meta:
@post_load
def build_dto(self, data: Dict[str, Any], **_) -> ScanResultsSyncFlow:
return ScanResultsSyncFlow(**data)


@dataclass
class SupportedModulesPreferences:
secret_scanning: bool
leak_scanning: bool
iac_scanning: bool
sca_scanning: bool
ci_cd_scanning: bool
sast_scanning: bool
container_scanning: bool
access_review: bool
asoc: bool
cimon: bool
ai_machine_learning: bool
ai_large_language_model: bool


class SupportedModulesPreferencesSchema(Schema):
class Meta:
unknown = EXCLUDE

secret_scanning = fields.Boolean()
leak_scanning = fields.Boolean()
iac_scanning = fields.Boolean()
sca_scanning = fields.Boolean()
ci_cd_scanning = fields.Boolean()
sast_scanning = fields.Boolean()
container_scanning = fields.Boolean()
access_review = fields.Boolean()
asoc = fields.Boolean()
cimon = fields.Boolean()
ai_machine_learning = fields.Boolean()
ai_large_language_model = fields.Boolean()

@post_load
def build_dto(self, data: Dict[str, Any], **_) -> 'SupportedModulesPreferences':
return SupportedModulesPreferences(**data)
8 changes: 6 additions & 2 deletions cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self._DETECTIONS_SERVICE_CONTROLLER_PATH = 'api/v1/detections'
self._DETECTIONS_SERVICE_CLI_CONTROLLER_PATH = 'api/v1/detections/cli'

self.POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'
self._POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'

self._hide_response_log = hide_response_log

Expand Down Expand Up @@ -198,10 +198,14 @@ def get_scan_details(self, scan_type: str, scan_id: str) -> models.ScanDetailsRe
def get_detection_rules_path(self) -> str:
return (
f'{self.scan_config.get_detections_prefix()}/'
f'{self.POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
f'{self._POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
f'detection_rules/byIds'
)

def get_supported_modules_preferences(self) -> models.SupportedModulesPreferences:
response = self.scan_cycode_client.get(url_path='preferences/api/v1/supportedmodules')
return models.SupportedModulesPreferencesSchema().load(response.json())

@staticmethod
def _get_policy_type_by_scan_type(scan_type: str) -> str:
scan_type_to_policy_type = {
Expand Down
Loading