diff --git a/compute/batch/requirements-test.txt b/compute/batch/requirements-test.txt index 75ceb43cfe42..a30f83328b79 100644 --- a/compute/batch/requirements-test.txt +++ b/compute/batch/requirements-test.txt @@ -1,2 +1,3 @@ pytest==7.1.3 pytest-parallel==0.1.1 +google-cloud-storage==2.5.0 diff --git a/compute/batch/snippets/create/create_with_mounted_bucket.py b/compute/batch/snippets/create/create_with_mounted_bucket.py new file mode 100644 index 000000000000..469df9c8994c --- /dev/null +++ b/compute/batch/snippets/create/create_with_mounted_bucket.py @@ -0,0 +1,90 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START batch_create_script_job_with_bucket] +from google.cloud import batch_v1 + + +def create_script_job_with_bucket(project_id: str, region: str, job_name: str, bucket_name: str) -> batch_v1.Job: + """ + This method shows how to create a sample Batch Job that will run + a simple command on Cloud Compute instances. + + Args: + project_id: project ID or project number of the Cloud project you want to use. + region: name of the region you want to use to run the job. Regions that are + available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations + job_name: the name of the job that will be created. + It needs to be unique for each project and region pair. + bucket_name: name of the bucket to be mounted for your Job. + + Returns: + A job object representing the job created. + """ + client = batch_v1.BatchServiceClient() + + # Define what will be done as part of the job. + task = batch_v1.TaskSpec() + runnable = batch_v1.Runnable() + runnable.script = batch_v1.Runnable.Script() + runnable.script.text = "echo Hello world from task ${BATCH_TASK_INDEX}. >> /mnt/share/output_task_${BATCH_TASK_INDEX}.txt" + task.runnables = [runnable] + + gcs_bucket = batch_v1.GCS() + gcs_bucket.remote_path = bucket_name + gcs_volume = batch_v1.Volume() + gcs_volume.gcs = gcs_bucket + gcs_volume.mount_path = '/mnt/share' + task.volumes = [gcs_volume] + + # We can specify what resources are requested by each task. + resources = batch_v1.ComputeResource() + resources.cpu_milli = 500 # in milliseconds per cpu-second. This means the task requires 50% of a single CPUs. + resources.memory_mib = 16 + task.compute_resource = resources + + task.max_retry_count = 2 + task.max_run_duration = "3600s" + + # Tasks are grouped inside a job using TaskGroups. + group = batch_v1.TaskGroup() + group.task_count = 4 + group.task_spec = task + + # Policies are used to define on what kind of virtual machines the tasks will run on. + # In this case, we tell the system to use "e2-standard-4" machine type. + # Read more about machine types here: https://cloud.google.com/compute/docs/machine-types + allocation_policy = batch_v1.AllocationPolicy() + policy = batch_v1.AllocationPolicy.InstancePolicy() + policy.machine_type = "e2-standard-4" + instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate() + instances.policy = policy + allocation_policy.instances = [instances] + + job = batch_v1.Job() + job.task_groups = [group] + job.allocation_policy = allocation_policy + job.labels = {"env": "testing", "type": "script", "mount": "bucket"} + # We use Cloud Logging as it's an out of the box available option + job.logs_policy = batch_v1.LogsPolicy() + job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING + + create_request = batch_v1.CreateJobRequest() + create_request.job = job + create_request.job_id = job_name + # The job's parent is the region in which the job will run + create_request.parent = f"projects/{project_id}/locations/{region}" + + return client.create_job(create_request) +# [END batch_create_script_job_with_bucket] diff --git a/compute/batch/snippets/tests/test_basics.py b/compute/batch/snippets/tests/test_basics.py index 88dfa75c0ef8..8897d8425ebc 100644 --- a/compute/batch/snippets/tests/test_basics.py +++ b/compute/batch/snippets/tests/test_basics.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import time +from typing import Callable import uuid - import google.auth from google.cloud import batch_v1 import pytest @@ -44,7 +44,7 @@ def job_name(): return f"test-job-{uuid.uuid4().hex[:10]}" -def _test_body(test_job: batch_v1.Job): +def _test_body(test_job: batch_v1.Job, additional_test: Callable = None): start_time = time.time() try: while test_job.status.state in WAIT_STATES: @@ -61,6 +61,9 @@ def _test_body(test_job: batch_v1.Job): break else: pytest.fail(f"Couldn't find job {test_job.uid} on the list of jobs.") + + if additional_test: + additional_test() finally: delete_job(PROJECT, REGION, test_job.name.rsplit('/', maxsplit=1)[1]).result() diff --git a/compute/batch/snippets/tests/test_bucket.py b/compute/batch/snippets/tests/test_bucket.py new file mode 100644 index 000000000000..ad8a347fbba0 --- /dev/null +++ b/compute/batch/snippets/tests/test_bucket.py @@ -0,0 +1,70 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid + + +import google.auth +from google.cloud import batch_v1 +from google.cloud import storage +import pytest + +from .test_basics import _test_body +from ..create.create_with_mounted_bucket import create_script_job_with_bucket + +PROJECT = google.auth.default()[1] +REGION = 'europe-north1' + +TIMEOUT = 600 # 10 minutes + +WAIT_STATES = { + batch_v1.JobStatus.State.STATE_UNSPECIFIED, + batch_v1.JobStatus.State.QUEUED, + batch_v1.JobStatus.State.RUNNING, + batch_v1.JobStatus.State.SCHEDULED, +} + + +@pytest.fixture +def job_name(): + return f"test-job-{uuid.uuid4().hex[:10]}" + + +@pytest.fixture() +def test_bucket(): + bucket_name = f"test-bucket-{uuid.uuid4().hex[:8]}" + client = storage.Client() + client.create_bucket(bucket_name, location="eu") + + yield bucket_name + + bucket = client.get_bucket(bucket_name) + bucket.delete(force=True) + + +def _test_bucket_content(test_bucket): + client = storage.Client() + bucket = client.get_bucket(test_bucket) + + file_name_template = "output_task_{task_number}.txt" + file_content_template = "Hello world from task {task_number}.\n" + + for i in range(4): + blob = bucket.blob(file_name_template.format(task_number=i)) + content = blob.download_as_bytes().decode() + assert content == file_content_template.format(task_number=i) + + +def test_bucket_job(job_name, test_bucket): + job = create_script_job_with_bucket(PROJECT, REGION, job_name, test_bucket) + _test_body(job, lambda: _test_bucket_content(test_bucket))