Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def build_dep_tree_path(path: str, generated_file_name: str) -> str:
return join_paths(get_file_dir(path), generated_file_name)


def execute_command(command: List, file_name: str, command_timeout: int) -> Optional[Dict]:
def execute_command(command: List[str], file_name: str, command_timeout: int) -> Optional[Dict]:
try:
dependencies = shell(command, command_timeout)
except Exception as e:
Expand Down
1 change: 1 addition & 0 deletions cycode/cli/helpers/maven/restore_maven_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def try_restore_dependencies(self, document: Document) -> Optional[Document]:
return restore_dependencies_document

def restore_from_secondary_command(self, document, manifest_file_path, restore_dependencies_document):
# TODO(MarshalX): does it even work? Missing argument
secondary_restore_command = create_secondary_restore_command(manifest_file_path)
backup_restore_content = execute_command(
secondary_restore_command,
Expand Down
16 changes: 14 additions & 2 deletions cycode/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import click
import sys

from typing import List
from typing import List, Optional

from cycode import __version__
from cycode.cli.models import Severity
Expand All @@ -15,6 +15,8 @@
from cycode.cli.auth.auth_command import authenticate
from cycode.cli.utils import scan_utils
from cycode.cyclient import logger
from cycode.cyclient.cycode_client_base import CycodeClientBase
from cycode.cyclient.models import UserAgentOptionScheme
from cycode.cyclient.scan_config.scan_config_creator import create_scan_client

CONTEXT = dict()
Expand Down Expand Up @@ -143,9 +145,15 @@ def finalize(context: click.Context, *args, **kwargs):
help='Specify the output (text/json), the default is text',
type=click.Choice(['text', 'json'])
)
@click.option(
'--user-agent',
default=None,
help='Characteristic JSON object that lets servers identify the application',
type=str,
)
@click.version_option(__version__, prog_name="cycode")
@click.pass_context
def main_cli(context: click.Context, verbose: bool, output: str):
def main_cli(context: click.Context, verbose: bool, output: str, user_agent: Optional[str]):
context.ensure_object(dict)
configuration_manager = ConfigurationManager()

Expand All @@ -156,6 +164,10 @@ def main_cli(context: click.Context, verbose: bool, output: str):

context.obj['output'] = output

if user_agent:
user_agent_option = UserAgentOptionScheme().loads(user_agent)
CycodeClientBase.enrich_user_agent(user_agent_option.user_agent_suffix)


def get_cycode_client(client_id, client_secret):
if not client_id or not client_secret:
Expand Down
12 changes: 12 additions & 0 deletions cycode/cli/user_settings/config_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ConfigFileManager(BaseFileManager):
EXCLUSIONS_SECTION_NAME: str = 'exclusions'
SCAN_SECTION_NAME: str = 'scan'

INSTALLATION_ID_FIELD_NAME: str = 'installation_id'
API_URL_FIELD_NAME: str = 'cycode_api_url'
APP_URL_FIELD_NAME: str = 'cycode_app_url'
VERBOSE_FIELD_NAME: str = 'verbose'
Expand Down Expand Up @@ -56,6 +57,17 @@ def update_base_url(self, base_url: str):
}
self.write_content_to_file(update_data)

def get_installation_id(self) -> Optional[str]:
return self._get_value_from_environment_section(self.INSTALLATION_ID_FIELD_NAME)

def update_installation_id(self, installation_id: str) -> None:
update_data = {
self.ENVIRONMENT_SECTION_NAME: {
self.INSTALLATION_ID_FIELD_NAME: installation_id
}
}
self.write_content_to_file(update_data)

def add_exclusion(self, scan_type, exclusion_type, new_exclusion):
exclusions = self._get_exclusions_by_exclusion_type(scan_type, exclusion_type)
if new_exclusion in exclusions:
Expand Down
18 changes: 16 additions & 2 deletions cycode/cli/user_settings/configuration_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from pathlib import Path
from typing import Optional, Dict
from uuid import uuid4

from cycode.cli.user_settings.config_file_manager import ConfigFileManager
from cycode.cli.consts import *
Expand Down Expand Up @@ -77,8 +78,21 @@ def update_base_url(self, base_url: str, scope: str = 'local'):
config_file_manager = self.get_config_file_manager(scope)
config_file_manager.update_base_url(base_url)

def get_config_file_manager(self, scope):
return self.local_config_file_manager if scope == 'local' else self.global_config_file_manager
def get_or_create_installation_id(self) -> str:
config_file_manager = self.get_config_file_manager()

installation_id = config_file_manager.get_installation_id()
if installation_id is None:
installation_id = uuid4().hex
config_file_manager.update_installation_id(installation_id)

return installation_id

def get_config_file_manager(self, scope: Optional[str] = None) -> ConfigFileManager:
if scope == 'local':
return self.local_config_file_manager

return self.global_config_file_manager

def get_scan_polling_timeout_in_seconds(self) -> int:
return int(self._get_value_from_environment_variables(SCAN_POLLING_TIMEOUT_IN_SECONDS_ENV_VAR_NAME,
Expand Down
31 changes: 18 additions & 13 deletions cycode/cli/utils/shell_executor.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
from typing import List, Optional
import subprocess
from typing import List, Optional, Union

import click

from cycode.cyclient import logger

TIMEOUT = 60
_SUBPROCESS_DEFAULT_TIMEOUT_SEC = 60


def shell(
command: Union[str, List[str]], timeout: int = _SUBPROCESS_DEFAULT_TIMEOUT_SEC, execute_in_shell=False
) -> Optional[str]:
logger.debug(f'Executing shell command: {command}')

def shell(command: List[str], timeout: int = TIMEOUT) -> Optional[str]:
logger.debug(f"executing shell command: {' '.join(map(str, command))}")
try:
result = subprocess.run(
command,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
shell=execute_in_shell,
check=True,
capture_output=True,
)
return result.stdout.decode("utf-8").rstrip()

return result.stdout.decode('UTF-8').strip()
except subprocess.CalledProcessError as e:
logger.debug('Error occurred while running shell command. %s', {'exception': str(e.stderr)})
pass
logger.debug(f'Error occurred while running shell command. Exception: {e.stderr}')
except subprocess.TimeoutExpired:
raise click.Abort(f'Command {" ".join(map(str, command))} timed out')
except Exception as exc:
raise click.ClickException(f"Unhandled exception: {str(exc)}")
raise click.Abort(f'Command "{command}" timed out')
except Exception as e:
raise click.ClickException(f'Unhandled exception: {e}')

return None
36 changes: 31 additions & 5 deletions cycode/cyclient/cycode_client_base.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
import platform
from typing import Dict

from requests import Response, request, exceptions

from cycode import __version__
from . import config
from cycode.cli.exceptions.custom_exceptions import NetworkError, HttpUnauthorizedError
from ..cli.exceptions.custom_exceptions import NetworkError, HttpUnauthorizedError
from ..cli.user_settings.configuration_manager import ConfigurationManager


class CycodeClientBase:
def get_cli_user_agent() -> str:
"""Return base User-Agent of CLI.

Example: CycodeCLI/0.2.3 (OS: Darwin; Arch: arm64; Python: 3.8.16; InstallID: *uuid4*)
"""
app_name = 'CycodeCLI'
version = __version__

os = platform.system()
arch = platform.machine()
python_version = platform.python_version()

install_id = ConfigurationManager().get_or_create_installation_id()

return f'{app_name}/{version} (OS: {os}; Arch: {arch}; Python: {python_version}; InstallID: {install_id})'

MANDATORY_HEADERS: dict = {
'User-Agent': f'cycode-cli_{__version__}',
}

class CycodeClientBase:
MANDATORY_HEADERS: Dict[str, str] = {'User-Agent': get_cli_user_agent()}

def __init__(self, api_url: str):
self.timeout = config.timeout
self.api_url = api_url

@staticmethod
def reset_user_agent() -> None:
CycodeClientBase.MANDATORY_HEADERS['User-Agent'] = get_cli_user_agent()

@staticmethod
def enrich_user_agent(user_agent_suffix: str) -> None:
CycodeClientBase.MANDATORY_HEADERS['User-Agent'] += f' {user_agent_suffix}'

def post(
self,
url_path: str,
Expand Down
32 changes: 32 additions & 0 deletions cycode/cyclient/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from marshmallow import Schema, fields, EXCLUDE, post_load

Expand Down Expand Up @@ -282,3 +283,34 @@ class Meta:
@post_load
def build_dto(self, data, **kwargs):
return ApiTokenGenerationPollingResponse(**data)


class UserAgentOptionScheme(Schema):
app_name = fields.String(required=True) # ex. vscode_extension
app_version = fields.String(required=True) # ex. 0.2.3
env_name = fields.String(required=True) # ex.: Visual Studio Code
env_version = fields.String(required=True) # ex. 1.78.2

@post_load
def build_dto(self, data: dict, **_) -> 'UserAgentOption':
return UserAgentOption(**data)


@dataclass
class UserAgentOption:
app_name: str
app_version: str
env_name: str
env_version: str

@property
def user_agent_suffix(self) -> str:
"""Returns suffix of User-Agent.

Example: vscode_extension (AppVersion: 0.1.2; EnvName: vscode; EnvVersion: 1.78.2)
"""
return f'{self.app_name} ' \
f'(' \
f'AppVersion: {self.app_version}; ' \
f'EnvName: {self.env_name}; EnvVersion: {self.env_version}' \
f')'
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ responses = ">=0.23.1,<0.24.0"
pyinstaller = ">=5.11.0,<5.12.0"
dunamai = ">=1.16.1,<1.17.0"

[tool.pytest.ini_options]
log_cli = true

[tool.poetry-dynamic-versioning]
# poetry self add "poetry-dynamic-versioning[plugin]"
enable = true
Expand Down
5 changes: 2 additions & 3 deletions tests/cyclient/test_client_base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from cycode import __version__
from cycode.cyclient import config
from cycode.cyclient.cycode_client_base import CycodeClientBase
from cycode.cyclient.cycode_client_base import CycodeClientBase, get_cli_user_agent


def test_mandatory_headers():
expected_headers = {
'User-Agent': f'cycode-cli_{__version__}',
'User-Agent': get_cli_user_agent(),
}

client = CycodeClientBase(config.cycode_api_url)
Expand Down