diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 8d0e50c0eb32..d81e1d1483b3 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -34,6 +34,7 @@ /composer/**/* @leahecole @rachael-ds @rafalbiegacz @GoogleCloudPlatform/python-samples-reviewers /compute/**/* @m-strzelczyk @GoogleCloudPlatform/dee-infra @GoogleCloudPlatform/python-samples-reviewers /container/**/* @GoogleCloudPlatform/dee-platform-ops @GoogleCloudPlatform/python-samples-reviewers +/containeranalysis/**/* @GoogleCloudPlatform/aap-dpes @GoogleCloudPlatform/python-samples-reviewers /data-science-onramp/ @leahecole @bradmiro @GoogleCloudPlatform/python-samples-reviewers /datacatalog/**/* @GoogleCloudPlatform/python-samples-reviewers /dataflow/**/* @davidcavazos @GoogleCloudPlatform/python-samples-reviewers diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index 70830901f191..cb3aa087fe7c 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -62,6 +62,10 @@ assign_issues_by: - 'api: texttospeech' to: - GoogleCloudPlatform/dee-platform-ops +- labels: + - 'api: containeranalysis' + to: + - GoogleCloudPlatform/aap-dpes - labels: - 'api: datascienceonramp' to: diff --git a/container_registry/container_analysis/README.md b/container_registry/container_analysis/README.md deleted file mode 100644 index 68aaa646aaa9..000000000000 --- a/container_registry/container_analysis/README.md +++ /dev/null @@ -1,3 +0,0 @@ -These samples have been moved. - -https://github.com/googleapis/python-containeranalysis/tree/main/samples diff --git a/containeranalysis/snippets/.gitignore b/containeranalysis/snippets/.gitignore new file mode 100644 index 000000000000..9e3d04c49501 --- /dev/null +++ b/containeranalysis/snippets/.gitignore @@ -0,0 +1 @@ +venv* diff --git a/containeranalysis/snippets/README.md b/containeranalysis/snippets/README.md new file mode 100644 index 000000000000..e5da544dda3c --- /dev/null +++ b/containeranalysis/snippets/README.md @@ -0,0 +1,54 @@ +Google
+Cloud Platform logo + +# Google Cloud Container Analysis Samples + + +Container Analysis scans container images stored in Container Registry for vulnerabilities. +Continuous automated analysis of containers keep you informed about known vulnerabilities so +that you can review and address issues before deployment. + +Additionally, third-party metadata providers can use Container Analysis to store and +retrieve additional metadata for their customers' images, such as packages installed in an image. + + +## Description + +These samples show how to use the [Google Cloud Container Analysis Client Library](https://cloud.google.com/container-registry/docs/reference/libraries). + +## Build and Run +1. **Enable APIs** + - [Enable the Container Analysis API](https://console.cloud.google.com/flows/enableapi?apiid=containeranalysis.googleapis.com) + and create a new project or select an existing project. +1. **Install and Initialize Cloud SDK** + - Follow instructions from the available [quickstarts](https://cloud.google.com/sdk/docs/quickstarts) +1. **Authenticate with GCP** + - Typically, you should authenticate using a [service account key](https://cloud.google.com/docs/authentication/getting-started) +1. **Clone the repo** and cd into this directory + + ``` + git clone https://github.com/GoogleCloudPlatform/python-docs-samples + cd python-docs-samples + ``` + +1. **Set Environment Variables** + + ``` + export GCLOUD_PROJECT="YOUR_PROJECT_ID" + ``` + +1. **Run Tests** + + ``` + nox -s "py36(sample='./containeranalysis')" + ``` + +## Contributing changes + +* See [CONTRIBUTING.md](../../CONTRIBUTING.md) + +## Licensing + +* See [LICENSE](../../LICENSE) + diff --git a/containeranalysis/snippets/requirements-test.txt b/containeranalysis/snippets/requirements-test.txt new file mode 100644 index 000000000000..49780e035690 --- /dev/null +++ b/containeranalysis/snippets/requirements-test.txt @@ -0,0 +1 @@ +pytest==7.2.0 diff --git a/containeranalysis/snippets/requirements.txt b/containeranalysis/snippets/requirements.txt new file mode 100644 index 000000000000..ad6c968e8cfd --- /dev/null +++ b/containeranalysis/snippets/requirements.txt @@ -0,0 +1,6 @@ +google-cloud-pubsub==2.13.10 +google-cloud-containeranalysis==2.9.3 +grafeas==1.6.1 +pytest==7.2.0 +flaky==3.7.0 +mock==4.0.3 diff --git a/containeranalysis/snippets/samples.py b/containeranalysis/snippets/samples.py new file mode 100644 index 000000000000..ecf28c43b837 --- /dev/null +++ b/containeranalysis/snippets/samples.py @@ -0,0 +1,373 @@ +#!/bin/python +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# [START containeranalysis_create_note] +def create_note(note_id, project_id): + """Creates and returns a new vulnerability note.""" + # note_id = 'my-note' + # project_id = 'my-gcp-project' + + from grafeas.grafeas_v1 import Version + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + note = { + 'vulnerability': { + 'details': [ + { + 'affected_cpe_uri': 'your-uri-here', + 'affected_package': 'your-package-here', + 'affected_version_start': { + 'kind': Version.VersionKind.MINIMUM + }, + 'fixed_version': { + 'kind': Version.VersionKind.MAXIMUM + } + } + ] + } + } + response = grafeas_client.create_note(parent=project_name, note_id=note_id, note=note) + return response +# [END containeranalysis_create_note] + + +# [START containeranalysis_delete_note] +def delete_note(note_id, project_id): + """Removes an existing note from the server.""" + # note_id = 'my-note' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + note_name = f"projects/{project_id}/notes/{note_id}" + + grafeas_client.delete_note(name=note_name) +# [END containeranalysis_delete_note] + + +# [START containeranalysis_create_occurrence] +def create_occurrence(resource_url, note_id, occurrence_project, note_project): + """ Creates and returns a new occurrence of a previously + created vulnerability note.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # note_id = 'my-note' + # occurrence_project = 'my-gcp-project' + # note_project = 'my-gcp-project' + + from grafeas.grafeas_v1 import Version + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + formatted_note = f"projects/{note_project}/notes/{note_id}" + formatted_project = f"projects/{occurrence_project}" + + occurrence = { + 'note_name': formatted_note, + 'resource_uri': resource_url, + 'vulnerability': { + 'package_issue': [ + { + 'affected_cpe_uri': 'your-uri-here', + 'affected_package': 'your-package-here', + 'affected_version': { + 'kind': Version.VersionKind.MINIMUM + }, + 'fixed_version': { + 'kind': Version.VersionKind.MAXIMUM + } + } + ] + } + } + + return grafeas_client.create_occurrence(parent=formatted_project, occurrence=occurrence) +# [END containeranalysis_create_occurrence] + + +# [START containeranalysis_delete_occurrence] +def delete_occurrence(occurrence_id, project_id): + """Removes an existing occurrence from the server.""" + # occurrence_id = basename(occurrence.name) + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + parent = f"projects/{project_id}/occurrences/{occurrence_id}" + grafeas_client.delete_occurrence(name=parent) +# [END containeranalysis_delete_occurrence] + + +# [START containeranalysis_get_note] +def get_note(note_id, project_id): + """Retrieves and prints a specified note from the server.""" + # note_id = 'my-note' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + note_name = f"projects/{project_id}/notes/{note_id}" + response = grafeas_client.get_note(name=note_name) + return response +# [END containeranalysis_get_note] + + +# [START containeranalysis_get_occurrence] +def get_occurrence(occurrence_id, project_id): + """retrieves and prints a specified occurrence from the server.""" + # occurrence_id = basename(occurrence.name) + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + parent = f"projects/{project_id}/occurrences/{occurrence_id}" + return grafeas_client.get_occurrence(name=parent) +# [END containeranalysis_get_occurrence] + + +# [START containeranalysis_discovery_info] +def get_discovery_info(resource_url, project_id): + """Retrieves and prints the discovery occurrence created for a specified + image. The discovery occurrence contains information about the initial + scan on the image.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + filter_str = 'kind="DISCOVERY" AND resourceUrl="{}"'.format(resource_url) + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + response = grafeas_client.list_occurrences(parent=project_name, + filter_=filter_str) + for occ in response: + print(occ) +# [END containeranalysis_discovery_info] + + +# [START containeranalysis_occurrences_for_note] +def get_occurrences_for_note(note_id, project_id): + """Retrieves all the occurrences associated with a specified Note. + Here, all occurrences are printed and counted.""" + # note_id = 'my-note' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + note_name = f"projects/{project_id}/notes/{note_id}" + + response = grafeas_client.list_note_occurrences(name=note_name) + count = 0 + for o in response: + # do something with the retrieved occurrence + # in this sample, we will simply count each one + count += 1 + return count +# [END containeranalysis_occurrences_for_note] + + +# [START containeranalysis_occurrences_for_image] +def get_occurrences_for_image(resource_url, project_id): + """Retrieves all the occurrences associated with a specified image. + Here, all occurrences are simply printed and counted.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + filter_str = 'resourceUrl="{}"'.format(resource_url) + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + + response = grafeas_client.list_occurrences(parent=project_name, + filter=filter_str) + count = 0 + for o in response: + # do something with the retrieved occurrence + # in this sample, we will simply count each one + count += 1 + return count +# [END containeranalysis_occurrences_for_image] + + +# [START containeranalysis_pubsub] +def pubsub(subscription_id, timeout_seconds, project_id): + """Respond to incoming occurrences using a Cloud Pub/Sub subscription.""" + # subscription_id := 'my-occurrences-subscription' + # timeout_seconds = 20 + # project_id = 'my-gcp-project' + + import time + from google.cloud.pubsub import SubscriberClient + + client = SubscriberClient() + subscription_name = client.subscription_path(project_id, subscription_id) + receiver = MessageReceiver() + client.subscribe(subscription_name, receiver.pubsub_callback) + + # listen for 'timeout' seconds + for _ in range(timeout_seconds): + time.sleep(1) + # print and return the number of pubsub messages received + print(receiver.msg_count) + return receiver.msg_count + + +class MessageReceiver: + """Custom class to handle incoming Pub/Sub messages.""" + def __init__(self): + # initialize counter to 0 on initialization + self.msg_count = 0 + + def pubsub_callback(self, message): + # every time a pubsub message comes in, print it and count it + self.msg_count += 1 + print('Message {}: {}'.format(self.msg_count, message.data)) + message.ack() + + +def create_occurrence_subscription(subscription_id, project_id): + """Creates a new Pub/Sub subscription object listening to the + Container Analysis Occurrences topic.""" + # subscription_id := 'my-occurrences-subscription' + # project_id = 'my-gcp-project' + + from google.api_core.exceptions import AlreadyExists + from google.cloud.pubsub import SubscriberClient + + topic_id = 'container-analysis-occurrences-v1' + client = SubscriberClient() + topic_name = f"projects/{project_id}/topics/{topic_id}" + subscription_name = client.subscription_path(project_id, subscription_id) + success = True + try: + client.create_subscription({"name": subscription_name, "topic": topic_name}) + except AlreadyExists: + # if subscription already exists, do nothing + pass + else: + success = False + return success +# [END containeranalysis_pubsub] + + +# [START containeranalysis_poll_discovery_occurrence_finished] +def poll_discovery_finished(resource_url, timeout_seconds, project_id): + """Returns the discovery occurrence for a resource once it reaches a + terminal state.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # timeout_seconds = 20 + # project_id = 'my-gcp-project' + + import time + from grafeas.grafeas_v1 import DiscoveryOccurrence + from google.cloud.devtools import containeranalysis_v1 + + deadline = time.time() + timeout_seconds + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + + discovery_occurrence = None + while discovery_occurrence is None: + time.sleep(1) + filter_str = 'resourceUrl="{}" \ + AND noteProjectId="goog-analysis" \ + AND noteId="PACKAGE_VULNERABILITY"'.format(resource_url) + # [END containeranalysis_poll_discovery_occurrence_finished] + # The above filter isn't testable, since it looks for occurrences in a + # locked down project fall back to a more permissive filter for testing + filter_str = 'kind="DISCOVERY" AND resourceUrl="{}"'\ + .format(resource_url) + # [START containeranalysis_poll_discovery_occurrence_finished] + result = grafeas_client.list_occurrences(parent=project_name, filter=filter_str) + # only one occurrence should ever be returned by ListOccurrences + # and the given filter + for item in result: + discovery_occurrence = item + if time.time() > deadline: + raise RuntimeError('timeout while retrieving discovery occurrence') + + status = DiscoveryOccurrence.AnalysisStatus.PENDING + while status != DiscoveryOccurrence.AnalysisStatus.FINISHED_UNSUPPORTED \ + and status != DiscoveryOccurrence.AnalysisStatus.FINISHED_FAILED \ + and status != DiscoveryOccurrence.AnalysisStatus.FINISHED_SUCCESS: + time.sleep(1) + updated = grafeas_client.get_occurrence(name=discovery_occurrence.name) + status = updated.discovery.analysis_status + if time.time() > deadline: + raise RuntimeError('timeout while waiting for terminal state') + return discovery_occurrence +# [END containeranalysis_poll_discovery_occurrence_finished] + + +# [START containeranalysis_vulnerability_occurrences_for_image] +def find_vulnerabilities_for_image(resource_url, project_id): + """"Retrieves all vulnerability occurrences associated with a resource.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # project_id = 'my-gcp-project' + + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + + filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'\ + .format(resource_url) + return list(grafeas_client.list_occurrences(parent=project_name, filter=filter_str)) +# [END containeranalysis_vulnerability_occurrences_for_image] + + +# [START containeranalysis_filter_vulnerability_occurrences] +def find_high_severity_vulnerabilities_for_image(resource_url, project_id): + """Retrieves a list of only high vulnerability occurrences associated + with a resource.""" + # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' + # project_id = 'my-gcp-project' + + from grafeas.grafeas_v1 import Severity + from google.cloud.devtools import containeranalysis_v1 + + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + project_name = f"projects/{project_id}" + + filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'\ + .format(resource_url) + vulnerabilities = grafeas_client.list_occurrences(parent=project_name, filter=filter_str) + filtered_list = [] + for v in vulnerabilities: + if v.vulnerability.effective_severity == Severity.HIGH or v.vulnerability.effective_severity == Severity.CRITICAL: + filtered_list.append(v) + return filtered_list +# [END containeranalysis_filter_vulnerability_occurrences] diff --git a/containeranalysis/snippets/samples_test.py b/containeranalysis/snippets/samples_test.py new file mode 100644 index 000000000000..dd9bce6c1771 --- /dev/null +++ b/containeranalysis/snippets/samples_test.py @@ -0,0 +1,317 @@ +#!/bin/python +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from os import environ +from os.path import basename +import threading +import time +import uuid + +from google.api_core.exceptions import AlreadyExists +from google.api_core.exceptions import InvalidArgument +from google.api_core.exceptions import NotFound +from google.cloud.devtools import containeranalysis_v1 +from google.cloud.pubsub import PublisherClient, SubscriberClient + +from grafeas.grafeas_v1 import DiscoveryOccurrence +from grafeas.grafeas_v1 import NoteKind +from grafeas.grafeas_v1 import Severity +from grafeas.grafeas_v1 import Version +import pytest + +import samples + +PROJECT_ID = environ['GOOGLE_CLOUD_PROJECT'] +SLEEP_TIME = 1 +TRY_LIMIT = 20 + + +class MessageReceiver: + """Custom class to handle incoming Pub/Sub messages.""" + def __init__(self, expected_msg_nums, done_event): + # initialize counter to 0 on initialization + self.msg_count = 0 + self.expected_msg_nums = expected_msg_nums + self.done_event = done_event + + def pubsub_callback(self, message): + # every time a pubsub message comes in, print it and count it + self.msg_count += 1 + print('Message {}: {}'.format(self.msg_count, message.data)) + message.ack() + if (self.msg_count == self.expected_msg_nums): + self.done_event.set() + + +class TestContainerAnalysisSamples: + + def setup_method(self, test_method): + print('SETUP {}'.format(test_method.__name__)) + self.note_id = 'note-{}'.format(uuid.uuid4()) + self.image_url = '{}.{}'.format(uuid.uuid4(), test_method.__name__) + self.note_obj = samples.create_note(self.note_id, PROJECT_ID) + + def teardown_method(self, test_method): + print('TEAR DOWN {}'.format(test_method.__name__)) + try: + samples.delete_note(self.note_id, PROJECT_ID) + except NotFound: + pass + + def test_create_note(self): + new_note = samples.get_note(self.note_id, PROJECT_ID) + assert new_note.name == self.note_obj.name + + def test_delete_note(self): + samples.delete_note(self.note_id, PROJECT_ID) + try: + samples.get_note(self.note_obj, PROJECT_ID) + except InvalidArgument: + pass + else: + # didn't raise exception we expected + assert (False) + + def test_create_occurrence(self): + created = samples.create_occurrence(self.image_url, + self.note_id, + PROJECT_ID, + PROJECT_ID) + retrieved = samples.get_occurrence(basename(created.name), PROJECT_ID) + assert created.name == retrieved.name + # clean up + samples.delete_occurrence(basename(created.name), PROJECT_ID) + + def test_delete_occurrence(self): + created = samples.create_occurrence(self.image_url, + self.note_id, + PROJECT_ID, + PROJECT_ID) + samples.delete_occurrence(basename(created.name), PROJECT_ID) + try: + samples.get_occurrence(basename(created.name), PROJECT_ID) + except NotFound: + pass + else: + # didn't raise exception we expected + assert False + + def test_occurrences_for_image(self): + orig_count = samples.get_occurrences_for_image(self.image_url, + PROJECT_ID) + occ = samples.create_occurrence(self.image_url, + self.note_id, + PROJECT_ID, + PROJECT_ID) + new_count = 0 + tries = 0 + while new_count != 1 and tries < TRY_LIMIT: + tries += 1 + new_count = samples.get_occurrences_for_image(self.image_url, + PROJECT_ID) + time.sleep(SLEEP_TIME) + assert new_count == 1 + assert orig_count == 0 + # clean up + samples.delete_occurrence(basename(occ.name), PROJECT_ID) + + def test_occurrences_for_note(self): + orig_count = samples.get_occurrences_for_note(self.note_id, + PROJECT_ID) + occ = samples.create_occurrence(self.image_url, + self.note_id, + PROJECT_ID, + PROJECT_ID) + new_count = 0 + tries = 0 + while new_count != 1 and tries < TRY_LIMIT: + tries += 1 + new_count = samples.get_occurrences_for_note(self.note_id, + PROJECT_ID) + time.sleep(SLEEP_TIME) + assert new_count == 1 + assert orig_count == 0 + # clean up + samples.delete_occurrence(basename(occ.name), PROJECT_ID) + + @pytest.mark.flaky(max_runs=3, min_passes=1) + def test_pubsub(self): + # create topic if needed + client = SubscriberClient() + try: + topic_id = 'container-analysis-occurrences-v1' + topic_name = {"name": f"projects/{PROJECT_ID}/topics/{topic_id}"} + publisher = PublisherClient() + publisher.create_topic(topic_name) + except AlreadyExists: + pass + + subscription_id = 'container-analysis-test-{}'.format(uuid.uuid4()) + subscription_name = client.subscription_path(PROJECT_ID, + subscription_id) + samples.create_occurrence_subscription(subscription_id, PROJECT_ID) + + # I can not make it pass with multiple messages. My guess is + # the server started to dedup? + message_count = 1 + try: + job_done = threading.Event() + receiver = MessageReceiver(message_count, job_done) + client.subscribe(subscription_name, receiver.pubsub_callback) + + for i in range(message_count): + occ = samples.create_occurrence( + self.image_url, self.note_id, PROJECT_ID, PROJECT_ID) + time.sleep(SLEEP_TIME) + samples.delete_occurrence(basename(occ.name), PROJECT_ID) + time.sleep(SLEEP_TIME) + # We saw occational failure with 60 seconds timeout, so we bumped it + # to 180 seconds. + # See also: python-docs-samples/issues/2894 + job_done.wait(timeout=180) + print('done. msg_count = {}'.format(receiver.msg_count)) + assert message_count <= receiver.msg_count + finally: + # clean up + client.delete_subscription({"subscription": subscription_name}) + + def test_poll_discovery_occurrence_fails(self): + # try with no discovery occurrence + try: + samples.poll_discovery_finished(self.image_url, 5, PROJECT_ID) + except RuntimeError: + pass + else: + # we expect timeout error + assert False + + @pytest.mark.flaky(max_runs=3, min_passes=1) + def test_poll_discovery_occurrence(self): + # create discovery occurrence + note_id = 'discovery-note-{}'.format(uuid.uuid4()) + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + note = { + 'discovery': { + 'analysis_kind': NoteKind.DISCOVERY + } + } + grafeas_client.\ + create_note(parent=f"projects/{PROJECT_ID}", note_id=note_id, note=note) + occurrence = { + 'note_name': f"projects/{PROJECT_ID}/notes/{note_id}", + 'resource_uri': self.image_url, + 'discovery': { + 'analysis_status': DiscoveryOccurrence.AnalysisStatus + .FINISHED_SUCCESS + } + } + created = grafeas_client.\ + create_occurrence(parent=f"projects/{PROJECT_ID}", + occurrence=occurrence) + + disc = samples.poll_discovery_finished(self.image_url, 10, PROJECT_ID) + status = disc.discovery.analysis_status + assert disc is not None + assert status == DiscoveryOccurrence.AnalysisStatus.FINISHED_SUCCESS + + # clean up + samples.delete_occurrence(basename(created.name), PROJECT_ID) + samples.delete_note(note_id, PROJECT_ID) + + def test_find_vulnerabilities_for_image(self): + occ_list = samples.find_vulnerabilities_for_image(self.image_url, + PROJECT_ID) + assert len(occ_list) == 0 + + created = samples.create_occurrence(self.image_url, + self.note_id, + PROJECT_ID, + PROJECT_ID) + tries = 0 + count = 0 + while count != 1 and tries < TRY_LIMIT: + tries += 1 + occ_list = samples.find_vulnerabilities_for_image(self.image_url, + PROJECT_ID) + count = len(occ_list) + time.sleep(SLEEP_TIME) + assert len(occ_list) == 1 + samples.delete_occurrence(basename(created.name), PROJECT_ID) + + def test_find_high_severity_vulnerabilities(self): + occ_list = samples.find_high_severity_vulnerabilities_for_image( + self.image_url, + PROJECT_ID) + assert len(occ_list) == 0 + + # create new high severity vulnerability + note_id = 'discovery-note-{}'.format(uuid.uuid4()) + client = containeranalysis_v1.ContainerAnalysisClient() + grafeas_client = client.get_grafeas_client() + note = { + 'vulnerability': { + 'severity': Severity.CRITICAL, + 'details': [ + { + 'affected_cpe_uri': 'your-uri-here', + 'affected_package': 'your-package-here', + 'affected_version_start': { + 'kind': Version.VersionKind.MINIMUM + }, + 'fixed_version': { + 'kind': Version.VersionKind.MAXIMUM + } + } + ] + } + } + grafeas_client.\ + create_note(parent=f"projects/{PROJECT_ID}", note_id=note_id, note=note) + occurrence = { + 'note_name': f"projects/{PROJECT_ID}/notes/{note_id}", + 'resource_uri': self.image_url, + 'vulnerability': { + 'effective_severity': Severity.CRITICAL, + 'package_issue': [ + { + 'affected_cpe_uri': 'your-uri-here', + 'affected_package': 'your-package-here', + 'affected_version': { + 'kind': Version.VersionKind.MINIMUM + }, + 'fixed_version': { + 'kind': Version.VersionKind.MAXIMUM + } + } + ] + } + } + created = grafeas_client.\ + create_occurrence(parent=f"projects/{PROJECT_ID}", + occurrence=occurrence) + # query again + tries = 0 + count = 0 + while count != 1 and tries < TRY_LIMIT: + tries += 1 + occ_list = samples.find_vulnerabilities_for_image(self.image_url, + PROJECT_ID) + count = len(occ_list) + time.sleep(SLEEP_TIME) + assert len(occ_list) == 1 + # clean up + samples.delete_occurrence(basename(created.name), PROJECT_ID) + samples.delete_note(note_id, PROJECT_ID)