Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cycode/cli/commands/scan/code_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,35 @@ def set_issue_detected_by_scan_results(context: click.Context, scan_results: Lis
set_issue_detected(context, any(scan_result.issue_detected for scan_result in scan_results))


def _enrich_scan_result_with_data_from_detection_rules(
cycode_client: 'ScanClient', scan_type: str, scan_result: ZippedFileScanResult
) -> None:
# TODO(MarshalX): remove scan_type arg after migration to new backend filter
if scan_type != consts.SECRET_SCAN_TYPE:
# not yet
return

detection_rule_ids = set()
for detections_per_file in scan_result.detections_per_file:
for detection in detections_per_file.detections:
detection_rule_ids.add(detection.detection_rule_id)

detection_rules = cycode_client.get_detection_rules(scan_type, detection_rule_ids)
detection_rules_by_id = {detection_rule.detection_rule_id: detection_rule for detection_rule in detection_rules}

for detections_per_file in scan_result.detections_per_file:
for detection in detections_per_file.detections:
detection_rule = detection_rules_by_id.get(detection.detection_rule_id)
if not detection_rule:
# we want to make sure that BE returned it. better to not map data instead of failed scan
continue

# TODO(MarshalX): here we can also map severity without migrating secrets to async flow

# detection_details never was typed properly. so not a problem for now
detection.detection_details['custom_remediation_guidelines'] = detection_rule.custom_remediation_guidelines


def _get_scan_documents_thread_func(
context: click.Context, is_git_diff: bool, is_commit_range: bool, scan_parameters: dict
) -> Callable[[List[Document]], Tuple[str, CliError, LocalScanResult]]:
Expand All @@ -123,6 +152,8 @@ def _scan_batch_thread_func(batch: List[Document]) -> Tuple[str, CliError, Local
cycode_client, zipped_documents, scan_type, scan_id, is_git_diff, is_commit_range, scan_parameters
)

_enrich_scan_result_with_data_from_detection_rules(cycode_client, scan_type, scan_result)

local_scan_result = create_local_scan_result(
scan_result, batch, command_scan_type, scan_type, severity_threshold
)
Expand Down
14 changes: 12 additions & 2 deletions cycode/cli/printers/text_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def _print_detection_summary(
self, detection: Detection, document_path: str, scan_id: str, report_url: Optional[str]
) -> None:
detection_name = detection.type if self.scan_type == SECRET_SCAN_TYPE else detection.message
detection_name_styled = click.style(detection_name, fg='bright_red', bold=True)

detection_sha = detection.detection_details.get('sha512')
detection_sha_message = f'\nSecret SHA: {detection_sha}' if detection_sha else ''
Expand All @@ -78,10 +79,19 @@ def _print_detection_summary(
detection_commit_id = detection.detection_details.get('commit_id')
detection_commit_id_message = f'\nCommit SHA: {detection_commit_id}' if detection_commit_id else ''

company_guidelines = detection.detection_details.get('custom_remediation_guidelines')
company_guidelines_message = f'\nCompany Guideline: {company_guidelines}' if company_guidelines else ''

click.echo(
f'⛔ Found issue of type: {click.style(detection_name, fg="bright_red", bold=True)} '
f'⛔ '
f'Found issue of type: {detection_name_styled} '
f'(rule ID: {detection.detection_rule_id}) in file: {click.format_filename(document_path)} '
f'{detection_sha_message}{scan_id_message}{report_url_message}{detection_commit_id_message} ⛔'
f'{detection_sha_message}'
f'{scan_id_message}'
f'{report_url_message}'
f'{detection_commit_id_message}'
f'{company_guidelines_message}'
f' ⛔'
)

def _print_detection_code_segment(self, detection: Detection, document: Document, code_segment_size: int) -> None:
Expand Down
36 changes: 36 additions & 0 deletions cycode/cyclient/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,39 @@ class Meta:
@post_load
def build_dto(self, data: Dict[str, Any], **_) -> SbomReport:
return SbomReport(**data)


@dataclass
class ClassificationData:
severity: str


class ClassificationDataSchema(Schema):
class Meta:
unknown = EXCLUDE

severity = fields.String()

@post_load
def build_dto(self, data: Dict[str, Any], **_) -> ClassificationData:
return ClassificationData(**data)


@dataclass
class DetectionRule:
classification_data: List[ClassificationData]
detection_rule_id: str
custom_remediation_guidelines: Optional[str] = None


class DetectionRuleSchema(Schema):
class Meta:
unknown = EXCLUDE

classification_data = fields.Nested(ClassificationDataSchema, many=True)
detection_rule_id = fields.String()
custom_remediation_guidelines = fields.String(allow_none=True)

@post_load
def build_dto(self, data: Dict[str, Any], **_) -> DetectionRule:
return DetectionRule(**data)
57 changes: 56 additions & 1 deletion cycode/cyclient/scan_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, List, Optional, Set, Union

from requests import Response

from cycode.cli import consts
from cycode.cli.exceptions.custom_exceptions import CycodeError
from cycode.cli.files_collector.models.in_memory_zip import InMemoryZip
from cycode.cyclient import models
from cycode.cyclient.cycode_client_base import CycodeClientBase
Expand All @@ -20,6 +22,7 @@ def __init__(

self.SCAN_CONTROLLER_PATH = 'api/v1/scan'
self.DETECTIONS_SERVICE_CONTROLLER_PATH = 'api/v1/detections'
self.POLICIES_SERVICE_CONTROLLER_PATH_V3 = 'api/v3/policies'

self._hide_response_log = hide_response_log

Expand Down Expand Up @@ -95,6 +98,58 @@ def get_scan_details(self, scan_id: str) -> models.ScanDetailsResponse:
response = self.scan_cycode_client.get(url_path=self.get_scan_details_path(scan_id))
return models.ScanDetailsResponseSchema().load(response.json())

def get_detection_rules_path(self) -> str:
return (
f'{self.scan_config.get_detections_prefix()}/'
f'{self.POLICIES_SERVICE_CONTROLLER_PATH_V3}/'
f'detection_rules'
)

@staticmethod
def _get_policy_type_by_scan_type(scan_type: str) -> str:
scan_type_to_policy_type = {
consts.INFRA_CONFIGURATION_SCAN_TYPE: 'IaC',
consts.SCA_SCAN_TYPE: 'SCA',
consts.SECRET_SCAN_TYPE: 'SecretDetection',
consts.SAST_SCAN_TYPE: 'SAST',
}

if scan_type not in scan_type_to_policy_type:
raise CycodeError('Invalid scan type')

return scan_type_to_policy_type[scan_type]

@staticmethod
def _filter_detection_rules_by_ids(
detection_rules: List[models.DetectionRule], detection_rules_ids: Union[Set[str], List[str]]
) -> List[models.DetectionRule]:
ids = set(detection_rules_ids) # cast to set to perform faster search
return [rule for rule in detection_rules if rule.detection_rule_id in ids]

@staticmethod
def parse_detection_rules_response(response: Response) -> List[models.DetectionRule]:
return models.DetectionRuleSchema().load(response.json(), many=True)

def get_detection_rules(
self, scan_type: str, detection_rules_ids: Union[Set[str], List[str]]
) -> List[models.DetectionRule]:
# TODO(MarshalX): use filter by list of IDs instead of policy_type when BE will be ready
params = {
'include_hidden': False,
'include_only_enabled_detection_rules': True,
'page_number': 0,
'page_size': 5000,
'policy_types_v2': self._get_policy_type_by_scan_type(scan_type),
}
response = self.scan_cycode_client.get(
url_path=self.get_detection_rules_path(),
params=params,
hide_response_content_log=self._hide_response_log,
)

# we are filtering rules by ids in-place for smooth migration when backend will be ready
return self._filter_detection_rules_by_ids(self.parse_detection_rules_response(response), detection_rules_ids)

def get_scan_detections_path(self) -> str:
return f'{self.scan_config.get_detections_prefix()}/{self.DETECTIONS_SERVICE_CONTROLLER_PATH}'

Expand Down
20 changes: 20 additions & 0 deletions tests/cyclient/mocked_responses/data/detection_rules.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[
{
"classification_data": [
{
"severity": "High",
"classification_rule_id": "e4e826bd-5820-4cc9-ae5b-bbfceb500a21"
}
],
"detection_rule_id": "26ab3395-2522-4061-a50a-c69c2d622ca1"
},
{
"classification_data": [
{
"severity": "High",
"classification_rule_id": "e4e826bd-5820-4cc9-ae5b-bbfceb500a22"
}
],
"detection_rule_id": "12345678-aea1-4304-a6e9-012345678901"
}
]
15 changes: 15 additions & 0 deletions tests/cyclient/mocked_responses/scan_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,27 @@ def get_report_scan_status_response(url: str) -> responses.Response:
return responses.Response(method=responses.POST, url=url, status=200)


def get_detection_rules_url(scan_client: ScanClient) -> str:
api_url = scan_client.scan_cycode_client.api_url
service_url = scan_client.get_detection_rules_path()
return f'{api_url}/{service_url}'


def get_detection_rules_response(url: str) -> responses.Response:
with open(MOCKED_RESPONSES_PATH.joinpath('detection_rules.json'), encoding='UTF-8') as f:
json_response = json.load(f)

return responses.Response(method=responses.GET, url=url, json=json_response, status=200)


def mock_scan_async_responses(
responses_module: responses, scan_type: str, scan_client: ScanClient, scan_id: UUID, zip_content_path: Path
) -> None:
responses_module.add(
get_zipped_file_scan_async_response(get_zipped_file_scan_async_url(scan_type, scan_client), scan_id)
)
responses_module.add(get_scan_details_response(get_scan_details_url(scan_id, scan_client), scan_id))
responses_module.add(get_detection_rules_response(get_detection_rules_url(scan_client)))
responses_module.add(get_scan_detections_count_response(get_scan_detections_count_url(scan_client)))
responses_module.add(get_scan_detections_response(get_scan_detections_url(scan_client), scan_id, zip_content_path))
responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client)))
Expand All @@ -164,4 +178,5 @@ def mock_scan_responses(
responses_module.add(
get_zipped_file_scan_response(get_zipped_file_scan_url(scan_type, scan_client), zip_content_path)
)
responses_module.add(get_detection_rules_response(get_detection_rules_url(scan_client)))
responses_module.add(get_report_scan_status_response(get_report_scan_status_url(scan_type, scan_id, scan_client)))