Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
462 changes: 57 additions & 405 deletions cycode/cli/code_scanner.py

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions cycode/cli/commands/report/report_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import click

from cycode.cli.commands.report.sbom.sbom_command import sbom_command
from cycode.cli.utils.get_api_client import get_report_cycode_client
from cycode.cli.utils.progress_bar import SBOM_REPORT_PROGRESS_BAR_SECTIONS, get_progress_bar


@click.group(
commands={
'sbom': sbom_command,
},
short_help='Generate report. You`ll need to specify which report type to perform.',
)
@click.pass_context
def report_command(
context: click.Context,
) -> int:
"""Generate report."""

context.obj['client'] = get_report_cycode_client(hide_response_log=False) # TODO disable log
context.obj['progress_bar'] = get_progress_bar(hidden=False, sections=SBOM_REPORT_PROGRESS_BAR_SECTIONS)

return 1
94 changes: 94 additions & 0 deletions cycode/cli/commands/report/sbom/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pathlib
import time
from platform import platform
from typing import TYPE_CHECKING, Optional

from cycode.cli import consts
from cycode.cli.commands.report.sbom.sbom_report_file import SbomReportFile
from cycode.cli.config import configuration_manager
from cycode.cli.exceptions.custom_exceptions import ReportAsyncError
from cycode.cli.utils.progress_bar import SbomReportProgressBarSection
from cycode.cyclient import logger
from cycode.cyclient.models import ReportExecutionSchema

if TYPE_CHECKING:
from cycode.cli.utils.progress_bar import BaseProgressBar
from cycode.cyclient.report_client import ReportClient


def _poll_report_execution_until_completed(
progress_bar: 'BaseProgressBar',
client: 'ReportClient',
report_execution_id: int,
polling_timeout: Optional[int] = None,
) -> ReportExecutionSchema:
if polling_timeout is None:
polling_timeout = configuration_manager.get_report_polling_timeout_in_seconds()

end_polling_time = time.time() + polling_timeout
while time.time() < end_polling_time:
report_execution = client.get_report_execution(report_execution_id)
report_label = report_execution.error_message or report_execution.status_message

progress_bar.update_label(report_label)

if report_execution.status == consts.REPORT_STATUS_COMPLETED:
return report_execution

if report_execution.status == consts.REPORT_STATUS_ERROR:
raise ReportAsyncError(f'Error occurred while trying to generate report: {report_label}')

time.sleep(consts.REPORT_POLLING_WAIT_INTERVAL_IN_SECONDS)

raise ReportAsyncError(f'Timeout exceeded while waiting for report to complete. Timeout: {polling_timeout} sec.')


def send_report_feedback(
client: 'ReportClient',
start_scan_time: float,
report_type: str,
report_command_type: str,
request_report_parameters: dict,
report_execution_id: int,
error_message: Optional[str] = None,
request_zip_file_size: Optional[int] = None,
**kwargs,
) -> None:
try:
request_report_parameters.update(kwargs)

end_scan_time = time.time()
scan_status = {
'report_type': report_type,
'report_command_type': report_command_type,
'request_report_parameters': request_report_parameters,
'operation_system': platform(),
'error_message': error_message,
'execution_time': int(end_scan_time - start_scan_time),
'request_zip_file_size': request_zip_file_size,
}

client.report_status(report_execution_id, scan_status)
except Exception as e:
logger.debug(f'Failed to send report feedback: {e}')


def create_sbom_report(
progress_bar: 'BaseProgressBar',
client: 'ReportClient',
report_execution_id: int,
output_file: Optional[pathlib.Path],
output_format: str,
) -> None:
report_execution = _poll_report_execution_until_completed(progress_bar, client, report_execution_id)

progress_bar.set_section_length(SbomReportProgressBarSection.GENERATION)

report_path = report_execution.storage_details.path
report_content = client.get_file_content(report_path)

progress_bar.set_section_length(SbomReportProgressBarSection.RECEIVE_REPORT)
progress_bar.stop()

sbom_report = SbomReportFile(report_path, output_format, output_file)
sbom_report.write(report_content)
47 changes: 47 additions & 0 deletions cycode/cli/commands/report/sbom/handle_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import traceback
from typing import Optional

import click

from cycode.cli.exceptions import custom_exceptions
from cycode.cli.models import CliError, CliErrors
from cycode.cli.printers import ConsolePrinter


def handle_report_exception(context: click.Context, err: Exception) -> Optional[CliError]:
if context.obj['verbose']:
click.secho(f'Error: {traceback.format_exc()}', fg='red')

errors: CliErrors = {
custom_exceptions.NetworkError: CliError(
code='cycode_error',
message='Cycode was unable to complete this report. '
'Please try again by executing the `cycode report` command',
),
custom_exceptions.ScanAsyncError: CliError(
code='report_error',
message='Cycode was unable to complete this report. '
'Please try again by executing the `cycode report` command',
),
custom_exceptions.ReportAsyncError: CliError(
code='report_error',
message='Cycode was unable to complete this report. '
'Please try again by executing the `cycode report` command',
),
custom_exceptions.HttpUnauthorizedError: CliError(
code='auth_error',
message='Unable to authenticate to Cycode, your token is either invalid or has expired. '
'Please re-generate your token and reconfigure it by running the `cycode configure` command',
),
}

if type(err) in errors:
error = errors[type(err)]

ConsolePrinter(context).print_error(error)
return None

if isinstance(err, click.ClickException):
raise err

raise click.ClickException(str(err))
84 changes: 84 additions & 0 deletions cycode/cli/commands/report/sbom/sbom_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import pathlib
from typing import Optional

import click

from cycode.cli.commands.report.sbom.sbom_path_command import sbom_path_command
from cycode.cli.commands.report.sbom.sbom_repository_url_command import sbom_repository_url_command
from cycode.cli.config import config
from cycode.cyclient.report_client import ReportParameters


@click.group(
commands={
'path': sbom_path_command,
'repository_url': sbom_repository_url_command,
},
short_help='Generate SBOM report for remote repository by url or local directory by path.',
)
@click.option(
'--format',
'-f',
help='SBOM format.',
type=click.Choice(config['scans']['supported_sbom_formats']),
required=True,
)
@click.option(
'--output-format',
'-o',
default='json',
help='Specify the output file format (the default is json).',
type=click.Choice(['json']),
required=False,
)
@click.option(
'--output-file',
help='Output file (the default is autogenerated filename saved to the current directory).',
default=None,
type=click.Path(resolve_path=True, writable=True, path_type=pathlib.Path),
required=False,
)
@click.option(
'--include-vulnerabilities',
is_flag=True,
default=False,
help='Include vulnerabilities.',
type=bool,
required=False,
)
@click.option(
'--include-dev-dependencies',
is_flag=True,
default=False,
help='Include dev dependencies.',
type=bool,
required=False,
)
@click.pass_context
def sbom_command(
context: click.Context,
format: str,
output_format: Optional[str],
output_file: Optional[pathlib.Path],
include_vulnerabilities: bool,
include_dev_dependencies: bool,
) -> int:
"""Generate SBOM report."""
sbom_format_parts = format.split('-')
if len(sbom_format_parts) != 2:
raise click.ClickException('Invalid SBOM format.')

sbom_format, sbom_format_version = sbom_format_parts

report_parameters = ReportParameters(
entity_type='SbomCli',
sbom_report_type=sbom_format,
sbom_version=sbom_format_version,
output_format=output_format,
include_vulnerabilities=include_vulnerabilities,
include_dev_dependencies=include_dev_dependencies,
)
context.obj['report_parameters'] = report_parameters
context.obj['output_file'] = output_file

return 1
65 changes: 65 additions & 0 deletions cycode/cli/commands/report/sbom/sbom_path_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import time

import click

from cycode.cli import consts
from cycode.cli.commands.report.sbom.common import create_sbom_report, send_report_feedback
from cycode.cli.commands.report.sbom.handle_errors import handle_report_exception
from cycode.cli.files_collector.path_documents import get_relevant_document
from cycode.cli.files_collector.sca.sca_code_scanner import perform_pre_scan_documents_actions
from cycode.cli.files_collector.zip_documents import zip_documents
from cycode.cli.utils.progress_bar import SbomReportProgressBarSection


@click.command(short_help='Generate SBOM report for provided path in the command.')
@click.argument('path', nargs=1, type=click.Path(exists=True, resolve_path=True), required=True)
@click.pass_context
def sbom_path_command(context: click.Context, path: str) -> None:
client = context.obj['client']
report_parameters = context.obj['report_parameters']
output_format = report_parameters.output_format
output_file = context.obj['output_file']

progress_bar = context.obj['progress_bar']
progress_bar.start()

start_scan_time = time.time()
report_execution_id = -1

try:
documents = get_relevant_document(
progress_bar, SbomReportProgressBarSection.PREPARE_LOCAL_FILES, consts.SCA_SCAN_TYPE, path
)
# TODO(MarshalX): combine perform_pre_scan_documents_actions with get_relevant_document.
# unhardcode usage of context in perform_pre_scan_documents_actions
perform_pre_scan_documents_actions(context, consts.SCA_SCAN_TYPE, documents)

zipped_documents = zip_documents(consts.SCA_SCAN_TYPE, documents)
report_execution = client.request_sbom_report_execution(report_parameters, zip_file=zipped_documents)
report_execution_id = report_execution.id

create_sbom_report(progress_bar, client, report_execution_id, output_file, output_format)

send_report_feedback(
client=client,
start_scan_time=start_scan_time,
report_type='SBOM',
report_command_type='path',
request_report_parameters=report_parameters.to_dict(without_entity_type=False),
report_execution_id=report_execution_id,
request_zip_file_size=zipped_documents.size,
)
except Exception as e:
progress_bar.stop()

send_report_feedback(
client=client,
start_scan_time=start_scan_time,
report_type='SBOM',
report_command_type='path',
request_report_parameters=report_parameters.to_dict(without_entity_type=False),
report_execution_id=report_execution_id,
error_message=str(e),
)

handle_report_exception(context, e)
49 changes: 49 additions & 0 deletions cycode/cli/commands/report/sbom/sbom_report_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
import pathlib
import re
from typing import Optional

import click


class SbomReportFile:
def __init__(self, storage_path: str, output_format: str, output_file: Optional[pathlib.Path]) -> None:
if output_file is None:
output_file = pathlib.Path(storage_path)

output_ext = f'.{output_format}'
if output_file.suffix != output_ext:
output_file = output_file.with_suffix(output_ext)

self._file_path = output_file

def is_exists(self) -> bool:
return self._file_path.exists()

def _prompt_overwrite(self) -> bool:
return click.confirm(f'File {self._file_path} already exists. Save with a different filename?', default=True)

def _write(self, content: str) -> None:
with open(self._file_path, 'w', encoding='UTF-8') as f:
f.write(content)

def _notify_about_saved_file(self) -> None:
click.echo(f'Report saved to {self._file_path}')

def _find_and_set_unique_filename(self) -> None:
attempt_no = 0
while self.is_exists():
attempt_no += 1

base, ext = os.path.splitext(self._file_path)
# Remove previous suffix
base = re.sub(r'-\d+$', '', base)

self._file_path = pathlib.Path(f'{base}-{attempt_no}{ext}')

def write(self, content: str) -> None:
if self.is_exists() and self._prompt_overwrite():
self._find_and_set_unique_filename()

self._write(content)
self._notify_about_saved_file()
Loading