From 4e49a394dabf38cc6ef0bdabcbdc3d94a26a08cc Mon Sep 17 00:00:00 2001 From: Sampath Kumar Date: Wed, 7 Dec 2022 18:26:26 +0100 Subject: [PATCH] chore: Python code samples migrated to python-doc-samples (#348) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Python code samples migrated to python-doc-samples For more details, please check https://github.com/GoogleCloudPlatform/python-docs-samples/pull/8529 and http://b/257074849 * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../samples/README.md | 4 + .../samples/snippets/.gitignore | 1 - .../samples/snippets/README.md | 54 --- .../samples/snippets/noxfile.py | 292 ------------- .../samples/snippets/requirements-test.txt | 1 - .../samples/snippets/requirements.txt | 6 - .../samples/snippets/samples.py | 402 ------------------ .../samples/snippets/samples_test.py | 292 ------------- 8 files changed, 4 insertions(+), 1048 deletions(-) create mode 100644 packages/google-cloud-containeranalysis/samples/README.md delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/.gitignore delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/README.md delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/noxfile.py delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/requirements-test.txt delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/requirements.txt delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/samples.py delete mode 100644 packages/google-cloud-containeranalysis/samples/snippets/samples_test.py diff --git a/packages/google-cloud-containeranalysis/samples/README.md b/packages/google-cloud-containeranalysis/samples/README.md new file mode 100644 index 000000000000..e6d225b1eb13 --- /dev/null +++ b/packages/google-cloud-containeranalysis/samples/README.md @@ -0,0 +1,4 @@ +Samples migrated +================ + +New location: https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/containeranalysis/snippets diff --git a/packages/google-cloud-containeranalysis/samples/snippets/.gitignore b/packages/google-cloud-containeranalysis/samples/snippets/.gitignore deleted file mode 100644 index 9e3d04c49501..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/.gitignore +++ /dev/null @@ -1 +0,0 @@ -venv* diff --git a/packages/google-cloud-containeranalysis/samples/snippets/README.md b/packages/google-cloud-containeranalysis/samples/snippets/README.md deleted file mode 100644 index 73c45c354782..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/README.md +++ /dev/null @@ -1,54 +0,0 @@ -Google
-Cloud Platform logo - -# Google Cloud Container Analysis Samples - - -Container Analysis scans container images stored in Container Registry for vulnerabilities. -Continuous automated analysis of containers keep you informed about known vulnerabilities so -that you can review and address issues before deployment. - -Additionally, third-party metadata providers can use Container Analysis to store and -retrieve additional metadata for their customers' images, such as packages installed in an image. - - -## Description - -These samples show how to use the [Google Cloud Container Analysis Client Library](https://cloud.google.com/container-registry/docs/reference/libraries). - -## Build and Run -1. **Enable APIs** - - [Enable the Container Analysis API](https://console.cloud.google.com/flows/enableapi?apiid=containeranalysis.googleapis.com) - and create a new project or select an existing project. -1. **Install and Initialize Cloud SDK** - - Follow instructions from the available [quickstarts](https://cloud.google.com/sdk/docs/quickstarts) -1. **Authenticate with GCP** - - Typically, you should authenticate using a [service account key](https://cloud.google.com/docs/authentication/getting-started) -1. **Clone the repo** and cd into this directory - - ``` - git clone https://github.com/GoogleCloudPlatform/python-docs-samples - cd python-docs-samples - ``` - -1. **Set Environment Variables** - - ``` - export GCLOUD_PROJECT="YOUR_PROJECT_ID" - ``` - -1. **Run Tests** - - ``` - nox -s "py36(sample='./container_registry/container_analysis')" - ``` - -## Contributing changes - -* See [CONTRIBUTING.md](../../CONTRIBUTING.md) - -## Licensing - -* See [LICENSE](../../LICENSE) - diff --git a/packages/google-cloud-containeranalysis/samples/snippets/noxfile.py b/packages/google-cloud-containeranalysis/samples/snippets/noxfile.py deleted file mode 100644 index 0577084695fc..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/noxfile.py +++ /dev/null @@ -1,292 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import glob -import os -from pathlib import Path -import sys -from typing import Callable, Dict, Optional - -import nox - -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING -# DO NOT EDIT THIS FILE EVER! -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING - -BLACK_VERSION = "black==22.3.0" -ISORT_VERSION = "isort==5.10.1" - -# Copy `noxfile_config.py` to your directory and modify it instead. - -# `TEST_CONFIG` dict is a configuration hook that allows users to -# modify the test configurations. The values here should be in sync -# with `noxfile_config.py`. Users will copy `noxfile_config.py` into -# their directory and modify it. - -TEST_CONFIG = { - # You can opt out from the test for specific Python versions. - "ignored_versions": [], - # Old samples are opted out of enforcing Python type hints - # All new samples should feature them - "enforce_type_hints": False, - # An envvar key for determining the project id to use. Change it - # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a - # build specific Cloud project. You can also use your own string - # to use your own Cloud project. - "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", - # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - # If you need to use a specific version of pip, - # change pip_version_override to the string representation - # of the version number, for example, "20.2.4" - "pip_version_override": None, - # A dictionary you want to inject into your test. Don't put any - # secrets here. These values will override predefined values. - "envs": {}, -} - - -try: - # Ensure we can import noxfile_config in the project's directory. - sys.path.append(".") - from noxfile_config import TEST_CONFIG_OVERRIDE -except ImportError as e: - print("No user noxfile_config found: detail: {}".format(e)) - TEST_CONFIG_OVERRIDE = {} - -# Update the TEST_CONFIG with the user supplied values. -TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) - - -def get_pytest_env_vars() -> Dict[str, str]: - """Returns a dict for pytest invocation.""" - ret = {} - - # Override the GCLOUD_PROJECT and the alias. - env_key = TEST_CONFIG["gcloud_project_env"] - # This should error out if not set. - ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] - - # Apply user supplied envs. - ret.update(TEST_CONFIG["envs"]) - return ret - - -# DO NOT EDIT - automatically generated. -# All versions used to test samples. -ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10"] - -# Any default versions that should be ignored. -IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] - -TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) - -INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in ( - "True", - "true", -) - -# Error if a python version is missing -nox.options.error_on_missing_interpreters = True - -# -# Style Checks -# - - -# Linting with flake8. -# -# We ignore the following rules: -# E203: whitespace before ‘:’ -# E266: too many leading ‘#’ for block comment -# E501: line too long -# I202: Additional newline in a section of imports -# -# We also need to specify the rules which are ignored by default: -# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] -FLAKE8_COMMON_ARGS = [ - "--show-source", - "--builtin=gettext", - "--max-complexity=20", - "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", - "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", - "--max-line-length=88", -] - - -@nox.session -def lint(session: nox.sessions.Session) -> None: - if not TEST_CONFIG["enforce_type_hints"]: - session.install("flake8") - else: - session.install("flake8", "flake8-annotations") - - args = FLAKE8_COMMON_ARGS + [ - ".", - ] - session.run("flake8", *args) - - -# -# Black -# - - -@nox.session -def blacken(session: nox.sessions.Session) -> None: - """Run black. Format code to uniform standard.""" - session.install(BLACK_VERSION) - python_files = [path for path in os.listdir(".") if path.endswith(".py")] - - session.run("black", *python_files) - - -# -# format = isort + black -# - - -@nox.session -def format(session: nox.sessions.Session) -> None: - """ - Run isort to sort imports. Then run black - to format code to uniform standard. - """ - session.install(BLACK_VERSION, ISORT_VERSION) - python_files = [path for path in os.listdir(".") if path.endswith(".py")] - - # Use the --fss option to sort imports using strict alphabetical order. - # See https://pycqa.github.io/isort/docs/configuration/options.html#force-sort-within-sections - session.run("isort", "--fss", *python_files) - session.run("black", *python_files) - - -# -# Sample Tests -# - - -PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] - - -def _session_tests( - session: nox.sessions.Session, post_install: Callable = None -) -> None: - # check for presence of tests - test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( - "**/test_*.py", recursive=True - ) - test_list.extend(glob.glob("**/tests", recursive=True)) - - if len(test_list) == 0: - print("No tests found, skipping directory.") - return - - if TEST_CONFIG["pip_version_override"]: - pip_version = TEST_CONFIG["pip_version_override"] - session.install(f"pip=={pip_version}") - """Runs py.test for a particular project.""" - concurrent_args = [] - if os.path.exists("requirements.txt"): - if os.path.exists("constraints.txt"): - session.install("-r", "requirements.txt", "-c", "constraints.txt") - else: - session.install("-r", "requirements.txt") - with open("requirements.txt") as rfile: - packages = rfile.read() - - if os.path.exists("requirements-test.txt"): - if os.path.exists("constraints-test.txt"): - session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") - else: - session.install("-r", "requirements-test.txt") - with open("requirements-test.txt") as rtfile: - packages += rtfile.read() - - if INSTALL_LIBRARY_FROM_SOURCE: - session.install("-e", _get_repo_root()) - - if post_install: - post_install(session) - - if "pytest-parallel" in packages: - concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) - elif "pytest-xdist" in packages: - concurrent_args.extend(["-n", "auto"]) - - session.run( - "pytest", - *(PYTEST_COMMON_ARGS + session.posargs + concurrent_args), - # Pytest will return 5 when no tests are collected. This can happen - # on travis where slow and flaky tests are excluded. - # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html - success_codes=[0, 5], - env=get_pytest_env_vars(), - ) - - -@nox.session(python=ALL_VERSIONS) -def py(session: nox.sessions.Session) -> None: - """Runs py.test for a sample using the specified version of Python.""" - if session.python in TESTED_VERSIONS: - _session_tests(session) - else: - session.skip( - "SKIPPED: {} tests are disabled for this sample.".format(session.python) - ) - - -# -# Readmegen -# - - -def _get_repo_root() -> Optional[str]: - """Returns the root folder of the project.""" - # Get root of this repository. Assume we don't have directories nested deeper than 10 items. - p = Path(os.getcwd()) - for i in range(10): - if p is None: - break - if Path(p / ".git").exists(): - return str(p) - # .git is not available in repos cloned via Cloud Build - # setup.py is always in the library's root, so use that instead - # https://github.com/googleapis/synthtool/issues/792 - if Path(p / "setup.py").exists(): - return str(p) - p = p.parent - raise Exception("Unable to detect repository root.") - - -GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) - - -@nox.session -@nox.parametrize("path", GENERATED_READMES) -def readmegen(session: nox.sessions.Session, path: str) -> None: - """(Re-)generates the readme for a sample.""" - session.install("jinja2", "pyyaml") - dir_ = os.path.dirname(path) - - if os.path.exists(os.path.join(dir_, "requirements.txt")): - session.install("-r", os.path.join(dir_, "requirements.txt")) - - in_file = os.path.join(dir_, "README.rst.in") - session.run( - "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file - ) diff --git a/packages/google-cloud-containeranalysis/samples/snippets/requirements-test.txt b/packages/google-cloud-containeranalysis/samples/snippets/requirements-test.txt deleted file mode 100644 index 49780e035690..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/requirements-test.txt +++ /dev/null @@ -1 +0,0 @@ -pytest==7.2.0 diff --git a/packages/google-cloud-containeranalysis/samples/snippets/requirements.txt b/packages/google-cloud-containeranalysis/samples/snippets/requirements.txt deleted file mode 100644 index d347745af28e..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -google-cloud-pubsub==2.13.11 -google-cloud-containeranalysis==2.9.3 -grafeas==1.6.1 -pytest==7.2.0 -flaky==3.7.0 -mock==4.0.3 diff --git a/packages/google-cloud-containeranalysis/samples/snippets/samples.py b/packages/google-cloud-containeranalysis/samples/snippets/samples.py deleted file mode 100644 index 3f2211712650..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/samples.py +++ /dev/null @@ -1,402 +0,0 @@ -#!/bin/python -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# [START containeranalysis_create_note] -def create_note(note_id, project_id): - """Creates and returns a new vulnerability note.""" - # note_id = 'my-note' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - from grafeas.grafeas_v1 import Version - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - note = { - "vulnerability": { - "details": [ - { - "affected_cpe_uri": "your-uri-here", - "affected_package": "your-package-here", - "affected_version_start": {"kind": Version.VersionKind.MINIMUM}, - "fixed_version": {"kind": Version.VersionKind.MAXIMUM}, - } - ] - } - } - response = grafeas_client.create_note( - parent=project_name, note_id=note_id, note=note - ) - return response - - -# [END containeranalysis_create_note] - - -# [START containeranalysis_delete_note] -def delete_note(note_id, project_id): - """Removes an existing note from the server.""" - # note_id = 'my-note' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - note_name = f"projects/{project_id}/notes/{note_id}" - - grafeas_client.delete_note(name=note_name) - - -# [END containeranalysis_delete_note] - - -# [START containeranalysis_create_occurrence] -def create_occurrence(resource_url, note_id, occurrence_project, note_project): - """Creates and returns a new occurrence of a previously - created vulnerability note.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # note_id = 'my-note' - # occurrence_project = 'my-gcp-project' - # note_project = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - from grafeas.grafeas_v1 import Version - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - formatted_note = f"projects/{note_project}/notes/{note_id}" - formatted_project = f"projects/{occurrence_project}" - - occurrence = { - "note_name": formatted_note, - "resource_uri": resource_url, - "vulnerability": { - "package_issue": [ - { - "affected_cpe_uri": "your-uri-here", - "affected_package": "your-package-here", - "affected_version": {"kind": Version.VersionKind.MINIMUM}, - "fixed_version": {"kind": Version.VersionKind.MAXIMUM}, - } - ] - }, - } - - return grafeas_client.create_occurrence( - parent=formatted_project, occurrence=occurrence - ) - - -# [END containeranalysis_create_occurrence] - - -# [START containeranalysis_delete_occurrence] -def delete_occurrence(occurrence_id, project_id): - """Removes an existing occurrence from the server.""" - # occurrence_id = basename(occurrence.name) - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - parent = f"projects/{project_id}/occurrences/{occurrence_id}" - grafeas_client.delete_occurrence(name=parent) - - -# [END containeranalysis_delete_occurrence] - - -# [START containeranalysis_get_note] -def get_note(note_id, project_id): - """Retrieves and prints a specified note from the server.""" - # note_id = 'my-note' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - note_name = f"projects/{project_id}/notes/{note_id}" - response = grafeas_client.get_note(name=note_name) - return response - - -# [END containeranalysis_get_note] - - -# [START containeranalysis_get_occurrence] -def get_occurrence(occurrence_id, project_id): - """retrieves and prints a specified occurrence from the server.""" - # occurrence_id = basename(occurrence.name) - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - parent = f"projects/{project_id}/occurrences/{occurrence_id}" - return grafeas_client.get_occurrence(name=parent) - - -# [END containeranalysis_get_occurrence] - - -# [START containeranalysis_discovery_info] -def get_discovery_info(resource_url, project_id): - """Retrieves and prints the discovery occurrence created for a specified - image. The discovery occurrence contains information about the initial - scan on the image.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - filter_str = 'kind="DISCOVERY" AND resourceUrl="{}"'.format(resource_url) - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - response = grafeas_client.list_occurrences(parent=project_name, filter_=filter_str) - for occ in response: - print(occ) - - -# [END containeranalysis_discovery_info] - - -# [START containeranalysis_occurrences_for_note] -def get_occurrences_for_note(note_id, project_id): - """Retrieves all the occurrences associated with a specified Note. - Here, all occurrences are printed and counted.""" - # note_id = 'my-note' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - note_name = f"projects/{project_id}/notes/{note_id}" - - response = grafeas_client.list_note_occurrences(name=note_name) - count = 0 - for o in response: - # do something with the retrieved occurrence - # in this sample, we will simply count each one - count += 1 - return count - - -# [END containeranalysis_occurrences_for_note] - - -# [START containeranalysis_occurrences_for_image] -def get_occurrences_for_image(resource_url, project_id): - """Retrieves all the occurrences associated with a specified image. - Here, all occurrences are simply printed and counted.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - filter_str = 'resourceUrl="{}"'.format(resource_url) - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - - response = grafeas_client.list_occurrences(parent=project_name, filter=filter_str) - count = 0 - for o in response: - # do something with the retrieved occurrence - # in this sample, we will simply count each one - count += 1 - return count - - -# [END containeranalysis_occurrences_for_image] - - -# [START containeranalysis_pubsub] -def pubsub(subscription_id, timeout_seconds, project_id): - """Respond to incoming occurrences using a Cloud Pub/Sub subscription.""" - # subscription_id := 'my-occurrences-subscription' - # timeout_seconds = 20 - # project_id = 'my-gcp-project' - - import time - - from google.cloud.pubsub import SubscriberClient - - client = SubscriberClient() - subscription_name = client.subscription_path(project_id, subscription_id) - receiver = MessageReceiver() - client.subscribe(subscription_name, receiver.pubsub_callback) - - # listen for 'timeout' seconds - for _ in range(timeout_seconds): - time.sleep(1) - # print and return the number of pubsub messages received - print(receiver.msg_count) - return receiver.msg_count - - -class MessageReceiver: - """Custom class to handle incoming Pub/Sub messages.""" - - def __init__(self): - # initialize counter to 0 on initialization - self.msg_count = 0 - - def pubsub_callback(self, message): - # every time a pubsub message comes in, print it and count it - self.msg_count += 1 - print("Message {}: {}".format(self.msg_count, message.data)) - message.ack() - - -def create_occurrence_subscription(subscription_id, project_id): - """Creates a new Pub/Sub subscription object listening to the - Container Analysis Occurrences topic.""" - # subscription_id := 'my-occurrences-subscription' - # project_id = 'my-gcp-project' - - from google.api_core.exceptions import AlreadyExists - from google.cloud.pubsub import SubscriberClient - - topic_id = "container-analysis-occurrences-v1" - client = SubscriberClient() - topic_name = f"projects/{project_id}/topics/{topic_id}" - subscription_name = client.subscription_path(project_id, subscription_id) - success = True - try: - client.create_subscription({"name": subscription_name, "topic": topic_name}) - except AlreadyExists: - # if subscription already exists, do nothing - pass - else: - success = False - return success - - -# [END containeranalysis_pubsub] - - -# [START containeranalysis_poll_discovery_occurrence_finished] -def poll_discovery_finished(resource_url, timeout_seconds, project_id): - """Returns the discovery occurrence for a resource once it reaches a - terminal state.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # timeout_seconds = 20 - # project_id = 'my-gcp-project' - - import time - - from google.cloud.devtools import containeranalysis_v1 - from grafeas.grafeas_v1 import DiscoveryOccurrence - - deadline = time.time() + timeout_seconds - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - - discovery_occurrence = None - while discovery_occurrence is None: - time.sleep(1) - filter_str = 'resourceUrl="{}" \ - AND noteProjectId="goog-analysis" \ - AND noteId="PACKAGE_VULNERABILITY"'.format( - resource_url - ) - # [END containeranalysis_poll_discovery_occurrence_finished] - # The above filter isn't testable, since it looks for occurrences in a - # locked down project fall back to a more permissive filter for testing - filter_str = 'kind="DISCOVERY" AND resourceUrl="{}"'.format(resource_url) - # [START containeranalysis_poll_discovery_occurrence_finished] - result = grafeas_client.list_occurrences(parent=project_name, filter=filter_str) - # only one occurrence should ever be returned by ListOccurrences - # and the given filter - for item in result: - discovery_occurrence = item - if time.time() > deadline: - raise RuntimeError("timeout while retrieving discovery occurrence") - - status = DiscoveryOccurrence.AnalysisStatus.PENDING - while ( - status != DiscoveryOccurrence.AnalysisStatus.FINISHED_UNSUPPORTED - and status != DiscoveryOccurrence.AnalysisStatus.FINISHED_FAILED - and status != DiscoveryOccurrence.AnalysisStatus.FINISHED_SUCCESS - ): - time.sleep(1) - updated = grafeas_client.get_occurrence(name=discovery_occurrence.name) - status = updated.discovery.analysis_status - if time.time() > deadline: - raise RuntimeError("timeout while waiting for terminal state") - return discovery_occurrence - - -# [END containeranalysis_poll_discovery_occurrence_finished] - - -# [START containeranalysis_vulnerability_occurrences_for_image] -def find_vulnerabilities_for_image(resource_url, project_id): - """ "Retrieves all vulnerability occurrences associated with a resource.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - - filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'.format(resource_url) - return list(grafeas_client.list_occurrences(parent=project_name, filter=filter_str)) - - -# [END containeranalysis_vulnerability_occurrences_for_image] - - -# [START containeranalysis_filter_vulnerability_occurrences] -def find_high_severity_vulnerabilities_for_image(resource_url, project_id): - """Retrieves a list of only high vulnerability occurrences associated - with a resource.""" - # resource_url = 'https://gcr.io/my-project/my-image@sha256:123' - # project_id = 'my-gcp-project' - - from google.cloud.devtools import containeranalysis_v1 - from grafeas.grafeas_v1 import Severity - - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - project_name = f"projects/{project_id}" - - filter_str = 'kind="VULNERABILITY" AND resourceUrl="{}"'.format(resource_url) - vulnerabilities = grafeas_client.list_occurrences( - parent=project_name, filter=filter_str - ) - filtered_list = [] - for v in vulnerabilities: - if ( - v.vulnerability.effective_severity == Severity.HIGH - or v.vulnerability.effective_severity == Severity.CRITICAL - ): - filtered_list.append(v) - return filtered_list - - -# [END containeranalysis_filter_vulnerability_occurrences] diff --git a/packages/google-cloud-containeranalysis/samples/snippets/samples_test.py b/packages/google-cloud-containeranalysis/samples/snippets/samples_test.py deleted file mode 100644 index 8ed97fadad5b..000000000000 --- a/packages/google-cloud-containeranalysis/samples/snippets/samples_test.py +++ /dev/null @@ -1,292 +0,0 @@ -#!/bin/python -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from os import environ -from os.path import basename -import threading -import time -import uuid - -from google.api_core.exceptions import AlreadyExists, InvalidArgument, NotFound -from google.cloud.devtools import containeranalysis_v1 -from google.cloud.pubsub import PublisherClient, SubscriberClient -from grafeas.grafeas_v1 import DiscoveryOccurrence, NoteKind, Severity, Version -import pytest - -import samples - -PROJECT_ID = environ["GOOGLE_CLOUD_PROJECT"] -SLEEP_TIME = 1 -TRY_LIMIT = 20 - - -class MessageReceiver: - """Custom class to handle incoming Pub/Sub messages.""" - - def __init__(self, expected_msg_nums, done_event): - # initialize counter to 0 on initialization - self.msg_count = 0 - self.expected_msg_nums = expected_msg_nums - self.done_event = done_event - - def pubsub_callback(self, message): - # every time a pubsub message comes in, print it and count it - self.msg_count += 1 - print("Message {}: {}".format(self.msg_count, message.data)) - message.ack() - if self.msg_count == self.expected_msg_nums: - self.done_event.set() - - -class TestContainerAnalysisSamples: - def setup_method(self, test_method): - print("SETUP {}".format(test_method.__name__)) - self.note_id = "note-{}".format(uuid.uuid4()) - self.image_url = "{}.{}".format(uuid.uuid4(), test_method.__name__) - self.note_obj = samples.create_note(self.note_id, PROJECT_ID) - - def teardown_method(self, test_method): - print("TEAR DOWN {}".format(test_method.__name__)) - try: - samples.delete_note(self.note_id, PROJECT_ID) - except NotFound: - pass - - def test_create_note(self): - new_note = samples.get_note(self.note_id, PROJECT_ID) - assert new_note.name == self.note_obj.name - - def test_delete_note(self): - samples.delete_note(self.note_id, PROJECT_ID) - try: - samples.get_note(self.note_obj, PROJECT_ID) - except InvalidArgument: - pass - else: - # didn't raise exception we expected - assert False - - def test_create_occurrence(self): - created = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - retrieved = samples.get_occurrence(basename(created.name), PROJECT_ID) - assert created.name == retrieved.name - # clean up - samples.delete_occurrence(basename(created.name), PROJECT_ID) - - def test_delete_occurrence(self): - created = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - samples.delete_occurrence(basename(created.name), PROJECT_ID) - try: - samples.get_occurrence(basename(created.name), PROJECT_ID) - except NotFound: - pass - else: - # didn't raise exception we expected - assert False - - def test_occurrences_for_image(self): - orig_count = samples.get_occurrences_for_image(self.image_url, PROJECT_ID) - occ = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - new_count = 0 - tries = 0 - while new_count != 1 and tries < TRY_LIMIT: - tries += 1 - new_count = samples.get_occurrences_for_image(self.image_url, PROJECT_ID) - time.sleep(SLEEP_TIME) - assert new_count == 1 - assert orig_count == 0 - # clean up - samples.delete_occurrence(basename(occ.name), PROJECT_ID) - - def test_occurrences_for_note(self): - orig_count = samples.get_occurrences_for_note(self.note_id, PROJECT_ID) - occ = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - new_count = 0 - tries = 0 - while new_count != 1 and tries < TRY_LIMIT: - tries += 1 - new_count = samples.get_occurrences_for_note(self.note_id, PROJECT_ID) - time.sleep(SLEEP_TIME) - assert new_count == 1 - assert orig_count == 0 - # clean up - samples.delete_occurrence(basename(occ.name), PROJECT_ID) - - @pytest.mark.flaky(max_runs=3, min_passes=1) - def test_pubsub(self): - # create topic if needed - client = SubscriberClient() - try: - topic_id = "container-analysis-occurrences-v1" - topic_name = {"name": f"projects/{PROJECT_ID}/topics/{topic_id}"} - publisher = PublisherClient() - publisher.create_topic(topic_name) - except AlreadyExists: - pass - - subscription_id = "container-analysis-test-{}".format(uuid.uuid4()) - subscription_name = client.subscription_path(PROJECT_ID, subscription_id) - samples.create_occurrence_subscription(subscription_id, PROJECT_ID) - - # I can not make it pass with multiple messages. My guess is - # the server started to dedup? - message_count = 1 - try: - job_done = threading.Event() - receiver = MessageReceiver(message_count, job_done) - client.subscribe(subscription_name, receiver.pubsub_callback) - - for i in range(message_count): - occ = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - time.sleep(SLEEP_TIME) - samples.delete_occurrence(basename(occ.name), PROJECT_ID) - time.sleep(SLEEP_TIME) - # We saw occational failure with 60 seconds timeout, so we bumped it - # to 180 seconds. - # See also: python-docs-samples/issues/2894 - job_done.wait(timeout=180) - print("done. msg_count = {}".format(receiver.msg_count)) - assert message_count <= receiver.msg_count - finally: - # clean up - client.delete_subscription({"subscription": subscription_name}) - - def test_poll_discovery_occurrence_fails(self): - # try with no discovery occurrence - try: - samples.poll_discovery_finished(self.image_url, 5, PROJECT_ID) - except RuntimeError: - pass - else: - # we expect timeout error - assert False - - @pytest.mark.flaky(max_runs=3, min_passes=1) - def test_poll_discovery_occurrence(self): - # create discovery occurrence - note_id = "discovery-note-{}".format(uuid.uuid4()) - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - note = {"discovery": {"analysis_kind": NoteKind.DISCOVERY}} - grafeas_client.create_note( - parent=f"projects/{PROJECT_ID}", note_id=note_id, note=note - ) - occurrence = { - "note_name": f"projects/{PROJECT_ID}/notes/{note_id}", - "resource_uri": self.image_url, - "discovery": { - "analysis_status": DiscoveryOccurrence.AnalysisStatus.FINISHED_SUCCESS - }, - } - created = grafeas_client.create_occurrence( - parent=f"projects/{PROJECT_ID}", occurrence=occurrence - ) - - disc = samples.poll_discovery_finished(self.image_url, 10, PROJECT_ID) - status = disc.discovery.analysis_status - assert disc is not None - assert status == DiscoveryOccurrence.AnalysisStatus.FINISHED_SUCCESS - - # clean up - samples.delete_occurrence(basename(created.name), PROJECT_ID) - samples.delete_note(note_id, PROJECT_ID) - - def test_find_vulnerabilities_for_image(self): - occ_list = samples.find_vulnerabilities_for_image(self.image_url, PROJECT_ID) - assert len(occ_list) == 0 - - created = samples.create_occurrence( - self.image_url, self.note_id, PROJECT_ID, PROJECT_ID - ) - tries = 0 - count = 0 - while count != 1 and tries < TRY_LIMIT: - tries += 1 - occ_list = samples.find_vulnerabilities_for_image( - self.image_url, PROJECT_ID - ) - count = len(occ_list) - time.sleep(SLEEP_TIME) - assert len(occ_list) == 1 - samples.delete_occurrence(basename(created.name), PROJECT_ID) - - def test_find_high_severity_vulnerabilities(self): - occ_list = samples.find_high_severity_vulnerabilities_for_image( - self.image_url, PROJECT_ID - ) - assert len(occ_list) == 0 - - # create new high severity vulnerability - note_id = "discovery-note-{}".format(uuid.uuid4()) - client = containeranalysis_v1.ContainerAnalysisClient() - grafeas_client = client.get_grafeas_client() - note = { - "vulnerability": { - "severity": Severity.CRITICAL, - "details": [ - { - "affected_cpe_uri": "your-uri-here", - "affected_package": "your-package-here", - "affected_version_start": {"kind": Version.VersionKind.MINIMUM}, - "fixed_version": {"kind": Version.VersionKind.MAXIMUM}, - } - ], - } - } - grafeas_client.create_note( - parent=f"projects/{PROJECT_ID}", note_id=note_id, note=note - ) - occurrence = { - "note_name": f"projects/{PROJECT_ID}/notes/{note_id}", - "resource_uri": self.image_url, - "vulnerability": { - "effective_severity": Severity.CRITICAL, - "package_issue": [ - { - "affected_cpe_uri": "your-uri-here", - "affected_package": "your-package-here", - "affected_version": {"kind": Version.VersionKind.MINIMUM}, - "fixed_version": {"kind": Version.VersionKind.MAXIMUM}, - } - ], - }, - } - created = grafeas_client.create_occurrence( - parent=f"projects/{PROJECT_ID}", occurrence=occurrence - ) - # query again - tries = 0 - count = 0 - while count != 1 and tries < TRY_LIMIT: - tries += 1 - occ_list = samples.find_vulnerabilities_for_image( - self.image_url, PROJECT_ID - ) - count = len(occ_list) - time.sleep(SLEEP_TIME) - assert len(occ_list) == 1 - # clean up - samples.delete_occurrence(basename(created.name), PROJECT_ID) - samples.delete_note(note_id, PROJECT_ID)