|  | 
|  | 1 | +import os | 
|  | 2 | + | 
|  | 3 | +import click | 
|  | 4 | +from patch_ng import fromstring | 
|  | 5 | +from rich.console import Console | 
|  | 6 | +from rich.markdown import Markdown | 
|  | 7 | + | 
|  | 8 | +from cycode.cli.exceptions.handle_ai_remediation_errors import handle_ai_remediation_exception | 
|  | 9 | +from cycode.cli.models import CliResult | 
|  | 10 | +from cycode.cli.printers import ConsolePrinter | 
|  | 11 | +from cycode.cli.utils.get_api_client import get_scan_cycode_client | 
|  | 12 | + | 
|  | 13 | + | 
|  | 14 | +def _echo_remediation(context: click.Context, remediation_markdown: str, is_fix_available: bool) -> None: | 
|  | 15 | +    printer = ConsolePrinter(context) | 
|  | 16 | +    if printer.is_json_printer: | 
|  | 17 | +        data = {'remediation': remediation_markdown, 'is_fix_available': is_fix_available} | 
|  | 18 | +        printer.print_result(CliResult(success=True, message='Remediation fetched successfully', data=data)) | 
|  | 19 | +    else:  # text or table | 
|  | 20 | +        Console().print(Markdown(remediation_markdown)) | 
|  | 21 | + | 
|  | 22 | + | 
|  | 23 | +def _apply_fix(context: click.Context, diff: str, is_fix_available: bool) -> None: | 
|  | 24 | +    printer = ConsolePrinter(context) | 
|  | 25 | +    if not is_fix_available: | 
|  | 26 | +        printer.print_result(CliResult(success=False, message='Fix is not available for this violation')) | 
|  | 27 | +        return | 
|  | 28 | + | 
|  | 29 | +    patch = fromstring(diff.encode('UTF-8')) | 
|  | 30 | +    if patch is False: | 
|  | 31 | +        printer.print_result(CliResult(success=False, message='Failed to parse fix diff')) | 
|  | 32 | +        return | 
|  | 33 | + | 
|  | 34 | +    is_fix_applied = patch.apply(root=os.getcwd(), strip=0) | 
|  | 35 | +    if is_fix_applied: | 
|  | 36 | +        printer.print_result(CliResult(success=True, message='Fix applied successfully')) | 
|  | 37 | +    else: | 
|  | 38 | +        printer.print_result(CliResult(success=False, message='Failed to apply fix')) | 
|  | 39 | + | 
|  | 40 | + | 
|  | 41 | +@click.command(short_help='Get AI remediation (INTERNAL).', hidden=True) | 
|  | 42 | +@click.argument('detection_id', nargs=1, type=click.UUID, required=True) | 
|  | 43 | +@click.option( | 
|  | 44 | +    '--fix', | 
|  | 45 | +    is_flag=True, | 
|  | 46 | +    default=False, | 
|  | 47 | +    help='Apply fixes to resolve violations. Fix is not available for all violations.', | 
|  | 48 | +    type=click.BOOL, | 
|  | 49 | +    required=False, | 
|  | 50 | +) | 
|  | 51 | +@click.pass_context | 
|  | 52 | +def ai_remediation_command(context: click.Context, detection_id: str, fix: bool) -> None: | 
|  | 53 | +    client = get_scan_cycode_client() | 
|  | 54 | + | 
|  | 55 | +    try: | 
|  | 56 | +        remediation_markdown = client.get_ai_remediation(detection_id) | 
|  | 57 | +        fix_diff = client.get_ai_remediation(detection_id, fix=True) | 
|  | 58 | +        is_fix_available = bool(fix_diff)  # exclude empty string, None, etc. | 
|  | 59 | + | 
|  | 60 | +        if fix: | 
|  | 61 | +            _apply_fix(context, fix_diff, is_fix_available) | 
|  | 62 | +        else: | 
|  | 63 | +            _echo_remediation(context, remediation_markdown, is_fix_available) | 
|  | 64 | +    except Exception as err: | 
|  | 65 | +        handle_ai_remediation_exception(context, err) | 
|  | 66 | + | 
|  | 67 | +    context.exit() | 
0 commit comments