Skip to content

Commit

Permalink
docs(samples): Adding sample for bucket mounting (#43)
Browse files Browse the repository at this point in the history
* docs(samples): Adding sample for bucket mounting

* Fixing lint stuff.
  • Loading branch information
m-strzelczyk authored Sep 27, 2022
1 parent 5a1b6b3 commit c4f3b69
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 2 deletions.
1 change: 1 addition & 0 deletions compute/batch/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest==7.1.3
pytest-parallel==0.1.1
google-cloud-storage==2.5.0
90 changes: 90 additions & 0 deletions compute/batch/snippets/create/create_with_mounted_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START batch_create_script_job_with_bucket]
from google.cloud import batch_v1


def create_script_job_with_bucket(project_id: str, region: str, job_name: str, bucket_name: str) -> batch_v1.Job:
"""
This method shows how to create a sample Batch Job that will run
a simple command on Cloud Compute instances.
Args:
project_id: project ID or project number of the Cloud project you want to use.
region: name of the region you want to use to run the job. Regions that are
available for Batch are listed on: https://cloud.google.com/batch/docs/get-started#locations
job_name: the name of the job that will be created.
It needs to be unique for each project and region pair.
bucket_name: name of the bucket to be mounted for your Job.
Returns:
A job object representing the job created.
"""
client = batch_v1.BatchServiceClient()

# Define what will be done as part of the job.
task = batch_v1.TaskSpec()
runnable = batch_v1.Runnable()
runnable.script = batch_v1.Runnable.Script()
runnable.script.text = "echo Hello world from task ${BATCH_TASK_INDEX}. >> /mnt/share/output_task_${BATCH_TASK_INDEX}.txt"
task.runnables = [runnable]

gcs_bucket = batch_v1.GCS()
gcs_bucket.remote_path = bucket_name
gcs_volume = batch_v1.Volume()
gcs_volume.gcs = gcs_bucket
gcs_volume.mount_path = '/mnt/share'
task.volumes = [gcs_volume]

# We can specify what resources are requested by each task.
resources = batch_v1.ComputeResource()
resources.cpu_milli = 500 # in milliseconds per cpu-second. This means the task requires 50% of a single CPUs.
resources.memory_mib = 16
task.compute_resource = resources

task.max_retry_count = 2
task.max_run_duration = "3600s"

# Tasks are grouped inside a job using TaskGroups.
group = batch_v1.TaskGroup()
group.task_count = 4
group.task_spec = task

# Policies are used to define on what kind of virtual machines the tasks will run on.
# In this case, we tell the system to use "e2-standard-4" machine type.
# Read more about machine types here: https://cloud.google.com/compute/docs/machine-types
allocation_policy = batch_v1.AllocationPolicy()
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy.instances = [instances]

job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "script", "mount": "bucket"}
# We use Cloud Logging as it's an out of the box available option
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

create_request = batch_v1.CreateJobRequest()
create_request.job = job
create_request.job_id = job_name
# The job's parent is the region in which the job will run
create_request.parent = f"projects/{project_id}/locations/{region}"

return client.create_job(create_request)
# [END batch_create_script_job_with_bucket]
7 changes: 5 additions & 2 deletions compute/batch/snippets/tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from typing import Callable
import uuid


import google.auth
from google.cloud import batch_v1
import pytest
Expand Down Expand Up @@ -44,7 +44,7 @@ def job_name():
return f"test-job-{uuid.uuid4().hex[:10]}"


def _test_body(test_job: batch_v1.Job):
def _test_body(test_job: batch_v1.Job, additional_test: Callable = None):
start_time = time.time()
try:
while test_job.status.state in WAIT_STATES:
Expand All @@ -61,6 +61,9 @@ def _test_body(test_job: batch_v1.Job):
break
else:
pytest.fail(f"Couldn't find job {test_job.uid} on the list of jobs.")

if additional_test:
additional_test()
finally:
delete_job(PROJECT, REGION, test_job.name.rsplit('/', maxsplit=1)[1]).result()

Expand Down
70 changes: 70 additions & 0 deletions compute/batch/snippets/tests/test_bucket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid


import google.auth
from google.cloud import batch_v1
from google.cloud import storage
import pytest

from .test_basics import _test_body
from ..create.create_with_mounted_bucket import create_script_job_with_bucket

PROJECT = google.auth.default()[1]
REGION = 'europe-north1'

TIMEOUT = 600 # 10 minutes

WAIT_STATES = {
batch_v1.JobStatus.State.STATE_UNSPECIFIED,
batch_v1.JobStatus.State.QUEUED,
batch_v1.JobStatus.State.RUNNING,
batch_v1.JobStatus.State.SCHEDULED,
}


@pytest.fixture
def job_name():
return f"test-job-{uuid.uuid4().hex[:10]}"


@pytest.fixture()
def test_bucket():
bucket_name = f"test-bucket-{uuid.uuid4().hex[:8]}"
client = storage.Client()
client.create_bucket(bucket_name, location="eu")

yield bucket_name

bucket = client.get_bucket(bucket_name)
bucket.delete(force=True)


def _test_bucket_content(test_bucket):
client = storage.Client()
bucket = client.get_bucket(test_bucket)

file_name_template = "output_task_{task_number}.txt"
file_content_template = "Hello world from task {task_number}.\n"

for i in range(4):
blob = bucket.blob(file_name_template.format(task_number=i))
content = blob.download_as_bytes().decode()
assert content == file_content_template.format(task_number=i)


def test_bucket_job(job_name, test_bucket):
job = create_script_job_with_bucket(PROJECT, REGION, job_name, test_bucket)
_test_body(job, lambda: _test_bucket_content(test_bucket))

0 comments on commit c4f3b69

Please sign in to comment.