Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 39 additions & 7 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,41 @@
__pycache__
/build/
/*.egg-info/
/dist/
.idea
*.iml
.env

# coverage
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
htmlcov
coverage.*
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
67 changes: 33 additions & 34 deletions cycode/cli/auth/auth_command.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json

import click
import traceback

from cycode.cli.models import CliResult, CliErrors, CliError
from cycode.cli.printers import ConsolePrinter
from cycode.cli.auth.auth_manager import AuthManager
from cycode.cli.user_settings.credentials_manager import CredentialsManager
from cycode.cli.exceptions.custom_exceptions import AuthProcessError, NetworkError, HttpUnauthorizedError
Expand All @@ -19,10 +19,13 @@ def authenticate(context: click.Context):
return

try:
logger.debug("starting authentication process")
logger.debug('Starting authentication process')

auth_manager = AuthManager()
auth_manager.authenticate()
click.echo("Successfully logged into cycode")

result = CliResult(success=True, message='Successfully logged into cycode')
ConsolePrinter(context).print_result(result)
except Exception as e:
_handle_exception(context, e)

Expand All @@ -31,49 +34,45 @@ def authenticate(context: click.Context):
@click.pass_context
def authorization_check(context: click.Context):
""" Check your machine associating CLI with your cycode account """
passed_auth_check_args = {'context': context, 'content': {
'success': True,
'message': 'You are authorized'
}, 'color': 'green'}
failed_auth_check_args = {'context': context, 'content': {
'success': False,
'message': 'You are not authorized'
}, 'color': 'red'}
printer = ConsolePrinter(context)

passed_auth_check_res = CliResult(success=True, message='You are authorized')
failed_auth_check_res = CliResult(success=False, message='You are not authorized')

client_id, client_secret = CredentialsManager().get_credentials()
if not client_id or not client_secret:
return _print_result(**failed_auth_check_args)
return printer.print_result(failed_auth_check_res)

try:
# TODO(MarshalX): This property performs HTTP request to refresh the token. This must be the method.
if CycodeTokenBasedClient(client_id, client_secret).api_token:
return _print_result(**passed_auth_check_args)
return printer.print_result(passed_auth_check_res)
except (NetworkError, HttpUnauthorizedError):
if context.obj['verbose']:
click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False)

return _print_result(**failed_auth_check_args)
return printer.print_result(failed_auth_check_res)


def _print_result(context: click.Context, content: dict, color: str) -> None:
# the current impl of printers supports only results of scans
if context.obj['output'] == 'text':
return click.secho(content['message'], fg=color)
def _handle_exception(context: click.Context, e: Exception):
if context.obj['verbose']:
click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False)

return click.echo(json.dumps({'result': content['success'], 'message': content['message']}))
errors: CliErrors = {
AuthProcessError: CliError(
code='auth_error',
message='Authentication failed. Please try again later using the command `cycode auth`'
),
NetworkError: CliError(
code='cycode_error',
message='Authentication failed. Please try again later using the command `cycode auth`'
),
}

error = errors.get(type(e))
if error:
return ConsolePrinter(context).print_error(error)

def _handle_exception(context: click.Context, e: Exception):
verbose = context.obj["verbose"]
if verbose:
click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False)
if isinstance(e, AuthProcessError):
click.secho('Authentication failed. Please try again later using the command `cycode auth`',
fg='red', nl=False)
elif isinstance(e, NetworkError):
click.secho('Authentication failed. Please try again later using the command `cycode auth`',
fg='red', nl=False)
elif isinstance(e, click.ClickException):
if isinstance(e, click.ClickException):
raise e
else:
raise click.ClickException(str(e))

raise click.ClickException(str(e))
51 changes: 16 additions & 35 deletions cycode/cli/code_scanner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Type, NamedTuple

import click
import json
import logging
Expand All @@ -14,8 +12,8 @@

from halo import Halo

from cycode.cli.printers import ResultsPrinter
from cycode.cli.models import Document, DocumentDetections, Severity
from cycode.cli.printers import ConsolePrinter
from cycode.cli.models import Document, DocumentDetections, Severity, CliError, CliErrors
from cycode.cli.ci_integrations import get_commit_range
from cycode.cli.consts import *
from cycode.cli.config import configuration_manager
Expand Down Expand Up @@ -446,14 +444,15 @@ def print_scan_details(scan_details_response: ScanDetailsResponse):
if scan_details_response.message is not None:
logger.info(f"Scan message: {scan_details_response.message}")

def print_results(context: click.Context, document_detections_list: List[DocumentDetections]):
output_type = context.obj['output']
printer = ResultsPrinter()
printer.print_results(context, document_detections_list, output_type)

def print_results(context: click.Context, document_detections_list: List[DocumentDetections]) -> None:
printer = ConsolePrinter(context)
printer.print_scan_results(document_detections_list)


def enrich_scan_result(scan_result: ZippedFileScanResult, documents_to_scan: List[Document]) -> \
List[DocumentDetections]:
def enrich_scan_result(
scan_result: ZippedFileScanResult, documents_to_scan: List[Document]
) -> List[DocumentDetections]:
logger.debug('enriching scan result')
document_detections_list = []
for detections_per_file in scan_result.detections_per_file:
Expand Down Expand Up @@ -819,49 +818,39 @@ def _is_subpath_of_cycode_configuration_folder(filename: str) -> bool:
or filename.endswith(ConfigFileManager.get_config_file_route())


class CliScanError(NamedTuple):
soft_fail: bool
code: str
message: str


CliScanErrors = Dict[Type[Exception], CliScanError]


def _handle_exception(context: click.Context, e: Exception):
context.obj['did_fail'] = True

if context.obj['verbose']:
click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False)

# TODO(MarshalX): Create global CLI errors database and move this
errors: CliScanErrors = {
NetworkError: CliScanError(
errors: CliErrors = {
NetworkError: CliError(
soft_fail=True,
code='cycode_error',
message='Cycode was unable to complete this scan. '
'Please try again by executing the `cycode scan` command'
),
ScanAsyncError: CliScanError(
ScanAsyncError: CliError(
soft_fail=True,
code='scan_error',
message='Cycode was unable to complete this scan. '
'Please try again by executing the `cycode scan` command'
),
HttpUnauthorizedError: CliScanError(
HttpUnauthorizedError: CliError(
soft_fail=True,
code='auth_error',
message='Unable to authenticate to Cycode, your token is either invalid or has expired. '
'Please re-generate your token and reconfigure it by running the `cycode configure` command'
),
ZipTooLargeError: CliScanError(
ZipTooLargeError: CliError(
soft_fail=True,
code='zip_too_large_error',
message='The path you attempted to scan exceeds the current maximum scanning size cap (10MB). '
'Please try ignoring irrelevant paths using the ‘cycode ignore --by-path’ command '
'and execute the scan again'
),
InvalidGitRepositoryError: CliScanError(
InvalidGitRepositoryError: CliError(
soft_fail=False,
code='invalid_git_error',
message='The path you supplied does not correlate to a git repository. '
Expand All @@ -875,22 +864,14 @@ def _handle_exception(context: click.Context, e: Exception):
if error.soft_fail is True:
context.obj['soft_fail'] = True

return _print_error(context, error)
return ConsolePrinter(context).print_error(error)

if isinstance(e, click.ClickException):
raise e

raise click.ClickException(str(e))


def _print_error(context: click.Context, error: CliScanError) -> None:
# TODO(MarshalX): Extend functionality of CLI printers and move this
if context.obj['output'] == 'text':
click.secho(error.message, fg='red', nl=False)
elif context.obj['output'] == 'json':
click.echo(json.dumps({'error': error.code, 'message': error.message}, ensure_ascii=False))


def _report_scan_status(context: click.Context, scan_type: str, scan_id: str, scan_completed: bool,
output_detections_count: int, all_detections_count: int, files_to_scan_count: int,
zip_size: int, command_scan_type: str, error_message: Optional[str]):
Expand Down
17 changes: 16 additions & 1 deletion cycode/cli/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from enum import Enum
from typing import List
from typing import List, NamedTuple, Dict, Type

from cycode.cyclient.models import Detection


Expand Down Expand Up @@ -43,3 +44,17 @@ def try_get_value(name: str) -> any:
return None

return Severity[name].value


class CliError(NamedTuple):
code: str
message: str
soft_fail: bool = False


CliErrors = Dict[Type[Exception], CliError]


class CliResult(NamedTuple):
success: bool
message: str
11 changes: 2 additions & 9 deletions cycode/cli/printers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
from .json_printer import JsonPrinter
from .text_printer import TextPrinter
from .results_printer import ResultsPrinter
from cycode.cli.printers.console_printer import ConsolePrinter


__all__ = [
'JsonPrinter',
'TextPrinter',
'ResultsPrinter'
]
__all__ = ['ConsolePrinter']
13 changes: 10 additions & 3 deletions cycode/cli/printers/base_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@

import click

from cycode.cli.models import DocumentDetections
from cycode.cli.models import DocumentDetections, CliResult, CliError


class BasePrinter(ABC):

context: click.Context

def __init__(self, context: click.Context):
self.context = context

@abstractmethod
def print_results(self, results: List[DocumentDetections]):
def print_scan_results(self, results: List[DocumentDetections]) -> None:
pass

@abstractmethod
def print_result(self, result: CliResult) -> None:
pass

@abstractmethod
def print_error(self, error: CliError) -> None:
pass
47 changes: 47 additions & 0 deletions cycode/cli/printers/console_printer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import click
from typing import List, TYPE_CHECKING

from cycode.cli.consts import SCA_SCAN_TYPE
from cycode.cli.exceptions.custom_exceptions import CycodeError
from cycode.cli.models import DocumentDetections, CliResult, CliError
from cycode.cli.printers.table_printer import TablePrinter
from cycode.cli.printers.json_printer import JsonPrinter
from cycode.cli.printers.text_printer import TextPrinter

if TYPE_CHECKING:
from cycode.cli.printers.base_printer import BasePrinter


class ConsolePrinter:
_AVAILABLE_PRINTERS = {
'text': TextPrinter,
'json': JsonPrinter,
'text_sca': TablePrinter
}

def __init__(self, context: click.Context):
self.context = context
self.output_type = self.context.obj.get('output')

self._printer_class = self._AVAILABLE_PRINTERS.get(self.output_type)
if self._printer_class is None:
raise CycodeError(f'"{self.output_type}" output type is not supported.')

def print_scan_results(self, detections_results_list: List[DocumentDetections]) -> None:
printer = self._get_scan_printer()
printer.print_scan_results(detections_results_list)

def _get_scan_printer(self) -> 'BasePrinter':
scan_type = self.context.obj.get('scan_type')

printer_class = self._printer_class
if scan_type == SCA_SCAN_TYPE and self.output_type == 'text':
printer_class = TablePrinter

return printer_class(self.context)

def print_result(self, result: CliResult) -> None:
self._printer_class(self.context).print_result(result)

def print_error(self, error: CliError) -> None:
self._printer_class(self.context).print_error(error)
Loading