Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class BedrockAgentFlowHasDescription(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure Bedrock Agent Flow has a description"
id = "CKV_AWS_393"
supported_resources = ("aws_bedrockagent_flow",)
categories = (CheckCategories.GENERAL_SECURITY,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if conf.get("description"):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> list[str]:
return ["description"]


check = BedrockAgentFlowHasDescription()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class BedrockAgentFlowIsEncrypted(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure Bedrock Agent Flow is encrypted with a KMS key"
id = "CKV_AWS_394"
supported_resources = ("aws_bedrockagent_flow",)
categories = (CheckCategories.ENCRYPTION,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if conf.get("customer_encryption_key_arn"):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> list[str]:
return ["customer_encryption_key_arn"]


check = BedrockAgentFlowIsEncrypted()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck


class FSXS3AccessPointAttachmentHasPolicy(BaseResourceCheck):
def __init__(self) -> None:
name = "Ensure FSx for OpenZFS S3 Access Point Attachment has a policy"
id = "CKV_AWS_395"
supported_resources = ("aws_fsx_s3_access_point_attachment",)
categories = (CheckCategories.IAM,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
if conf.get("s3_access_point") and conf.get("s3_access_point")[0].get("policy"):
return CheckResult.PASSED
return CheckResult.FAILED

def get_evaluated_keys(self) -> list[str]:
return ["s3_access_point/[0]/policy"]


check = FSXS3AccessPointAttachmentHasPolicy()
39 changes: 39 additions & 0 deletions docs/GPT Documentation/Adding Checks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Python Checks

Each Check in checkov is responsible for defining identifying ONE violation in the code which we want to prevent.
We NEVER define a check which is trying to identify several issues.
Each check also validates ONLY ONE ENTITY at a time, and CANNOT check for connections between multiple entities.
Each check inherits from the base class of `BaseCheck` under the `checkov/common` directory.
Each check should add the following fields in the constructor:
- `name` - one line of accurate description of the check for a human to read and understand.
- `id` - the check's id, always in one of the formats:
1. `CKV_<cloud-provider>_<number>` where `cloud-provider` would be `AWS`, `AZURE` or `GCP` based on cloud provider. We will choose this if the check is related to a specific cloud provider.
2. `CKV_<framework>_<number>` where `framework` is a 3 letter representation fo the framework like `K8S`. We will choose this option when we do not have a specific cloud provider we use.
- `supported_resources` - tuple of resource types the check is intended to run against.
- `categories` - tuple of categories this check relate to, based on `CheckCategories` object under `checkov/common`.

Note that we don't want to have 2 checks with the same id, and the `<number>` should follow. So if the last check for K8S is number 75, the next check for k8s should be `CKV_K8S_76`.
The full index of available checks is maintained under [Policy Index](../5.Policy%20Index/).

The main function which defines the check is `scan_entity_conf`, but this can be also overrided in specific frameworks to allow easier implementation, with some examples are `scan_resource_conf` in `terraform` design for scanning `resource` objects in terraform, and `get_inspected_key` which is implemented in various frameworks.
Main notes:
1. `scan_entity_conf` - getting as input a dict representation of the entity we want to scan with all attributes defined on it after the graph building. Uses the dict to understand if the entity is defined correctly. If we use it we should also implement `get_evaluated_keys` to specify the specific keys we tried to check, or we can override the `self.evaluated_keys` during the implementation of `scan_entity_conf`.
2. `get_inspected_key` - as opposed to `scan_entity_conf`, this function gets the same input but just returns the string which represents the jsonpath of the key it searches. If found, the policy passes and the `evaluated_keys` field is automatically assignes it.
NOTE - we shouldn't implement both `get_inspected_key` and `scan_entity_conf` together, only one of them.

## Graph Checks
Graph checks relate to checks which REQUIRE CONNECTIONS BETWEEN MULTIPLE ENTITIES.
For example, if we define a service account with certain write permissions in kubernetes, by itself it's not a violation. But if we use it in a k8s resource which should have minimal permissions, it might be an issue.
Graph checks are defined in `yaml` files instead of `python` files and use the BQL syntax which is defined under the [Yaml Custom Policies documentation](../3.Custom%20Policies/YAML%20Custom%20Policies.md).
The graph checks are usually written under the `checks` directory of the relevant IAC framework under a directory called `graph` or `graph_checks`.
All graph checks's ids start with `CKV2_` (as opposed to `CKV_` for python checks).
We define a connection for graph checks between 2 entities by the existence of an edge between the 2 objects which represent those resources.
The implementation of all operators which are available for graph checks are under the directory `checkov/common/graph/checks_infra/solvers`.

## Adding Tests
Each check added to `checkov` should have a test with a passing and a failing example to make sure the check is both valid and stays updated over time.
The tests for a check of framework `X` should be located under the same folder structure as the check location but under the `tests` directory. For example, the tests for the check under `checkov/terraform/checks/resource/aws/check.py` should be under `tests/terraform/checks/resource/aws/test_check.py`.
Each such test is expected to have:
- An IAC file (or multiple files under the same directory) of the same framework which contains examples of both passing and failing resources. If there are multiple use-cases to check, then we expect an example for each of them. The resources names should contain both a description of the example and wether they are a passing or failing resource. For example - `passing_example_1_with_variable_rendering`. It's also possible to separate the examples to files of failing/passing examples and remove the prefix of `passing/failing` from the names.
- If an `expected.yaml` file exists in the same folder as the examples, it should contain all passing enities' ids under the `pass` group and all failing ones under the `fail`. In that case, the test should parse this file to decide which entities should pass or fail and check it in the test.
- The test itself should try to run the matching framework's `Runner` only on the tested check and validate the results against the expected results from `expected.yaml`. An example to how we can do it is using `runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id]))` when `runner` is the relevant `Runner` object for the framework, and the `check` is the relevant instance of the check that we want to test.
7 changes: 7 additions & 0 deletions docs/GPT Documentation/Checkov Structure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Checkov is separated to different directories based on IAC FRAMEWORKS it supports.
This is shown under the "checkov" directory, where the "common" directory is responsible for shared code across all frameworks, and all other directories (like "terraform" or "cloudformation") describe the code related to each framework.
The main objects defined per framework are:
1. `Parser` - responsible for parsing the different code files related to the specific framework (for example `.tf` files in `terraform`) to python objects.
2. `Graph` - A graph representation being built from the python objects the `Parser` created. Each object being created is a represented as a graph vertex using the corresponding `Block` class implementation for the framework. Edges in the graph are being created based on the framework internal rules, for example in `terraform` we can connect 2 resources by referencing their `terraform-id` (`<resource_type>.<resource_name>`) in other resources. Each framework has other rules and not all of the rules are implemented in checkov yet. The code for the `Graph` usually sits under `checkov\<framework>\graph`.
3. `Checks` - all of the checks which were defined for the framework.
4. `Runner` - responsible for running the `Parser` and then build a `Graph` based on the result of the parser. After this process, it also scans the graph using the defined `Checks` and creates a report of all of the violations we have found. This is the main class of each framework and defines how it is being run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resource "aws_bedrockagent_flow" "pass" {
name = "example"
execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows"
description = "This is an example flow."
}

resource "aws_bedrockagent_flow" "fail" {
name = "example"
execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resource "aws_bedrockagent_flow" "pass" {
name = "example"
execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows"
customer_encryption_key_arn = "arn:aws:kms:us-east-1:123456789012:key/aea0cafc-355a-40a3-84f8-d52855ed333e"
}

resource "aws_bedrockagent_flow" "fail" {
name = "example"
execution_role_arn = "arn:aws:iam::123456789012:role/service-role/AmazonBedrockExecutionRoleForFlows"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
resource "aws_fsx_s3_access_point_attachment" "pass" {
name = "example-attachment"
type = "OPENZFS"

openzfs_configuration {
volume_id = "fsvol-1234567890"

file_system_identity {
type = "POSIX"

posix_user {
uid = 1001
gid = 1001
}
}
}

s3_access_point {
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
AWS = "*"
}
Action = "s3:GetObject"
Resource = "arn:aws:s3:::my-bucket/*"
}
]
})
}
}

resource "aws_fsx_s3_access_point_attachment" "fail" {
name = "example-attachment"
type = "OPENZFS"

openzfs_configuration {
volume_id = "fsvol-1234567890"

file_system_identity {
type = "POSIX"

posix_user {
uid = 1001
gid = 1001
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


class TestBedrockAgentFlowHasDescription(unittest.TestCase):
def test(self):
# given
test_files_dir = os.path.dirname(os.path.realpath(__file__))

# when
report = Runner().run(
root_folder=None,
files=[os.path.join(test_files_dir, "example_BedrockAgentFlowHasDescription.tf")],
runner_filter=RunnerFilter(checks=["CKV_AWS_393"]),
)

# then
summary = report.get_summary()

passing_resources = {
"aws_bedrockagent_flow.pass",
}
failing_resources = {
"aws_bedrockagent_flow.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


class TestBedrockAgentFlowIsEncrypted(unittest.TestCase):
def test(self):
# given
test_files_dir = os.path.dirname(os.path.realpath(__file__))

# when
report = Runner().run(
root_folder=None,
files=[os.path.join(test_files_dir, "example_BedrockAgentFlowIsEncrypted.tf")],
runner_filter=RunnerFilter(checks=["CKV_AWS_394"]),
)

# then
summary = report.get_summary()

passing_resources = {
"aws_bedrockagent_flow.pass",
}
failing_resources = {
"aws_bedrockagent_flow.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import unittest

from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


class TestFSXS3AccessPointAttachmentHasPolicy(unittest.TestCase):
def test(self):
# given
test_files_dir = os.path.dirname(os.path.realpath(__file__))

# when
report = Runner().run(
root_folder=None,
files=[os.path.join(test_files_dir, "example_FSXS3AccessPointAttachmentHasPolicy.tf")],
runner_filter=RunnerFilter(checks=["CKV_AWS_395"]),
)

# then
summary = report.get_summary()

passing_resources = {
"aws_fsx_s3_access_point_attachment.pass",
}
failing_resources = {
"aws_fsx_s3_access_point_attachment.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Loading