Skip to content

Commit

Permalink
Adding helpers to parse Bigtable create cluster operation.
Browse files Browse the repository at this point in the history
A CreateCluster request response doesn't actual indicate
success or failure. Rather it returns a cluster object with
the validated request parts inside and a `current_operation`
attached.

We implement `_process_operation` so that we can determine the
ID of that long-running operation (so it can be checked for
completion / success, if desired by the user). In addition
we seek to notify the user when the request began.

From the [service definition][1] we know that the `current_operation`
is a [long-running operation][2] and that:

>  The embedded operation's "metadata" field type is `CreateClusterMetadata`,
>  The embedded operation's "response" field type is `Cluster`, if successful.

The [`Operation` metadata][3] is of type [`Any`][4] (which uses a `type_url`
and raw bytes to provide **any** protobuf message type in a single
field, but still allow it to be parsed into it's true type after
the fact). So we expect `CreateCluster` responses to have long-running
operations with a type URL matching [`CreateClusterMetadata`][5].

As a result, we introduce a utility (`_parse_pb_any_to_native`) for
parsing an `Any` field into the native protobuf type specified by the
type URL. Since we know we need to handle `CreateClusterMetadata` values,
we add a default mapping (`_TYPE_URL_MAP`) from the corresponding type url
for that message type to the native Python type.

The `CreateClusterMetadata` type has `request_time` and
`finish_time` fields of type [`Timestamp`][6] so we also
add the `_pb_timestamp_to_datetime` helper for converting protobuf
messages into native Python `datetime.datetime` objects.

[1]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/bigtable/admin/cluster/v1/bigtable_cluster_service.proto#L64
[2]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/bigtable/admin/cluster/v1/bigtable_cluster_data.proto#L74
[3]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/longrunning/operations.proto#L82
[4]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/protobuf/any.proto#L58
[5]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/bigtable/admin/cluster/v1/bigtable_cluster_service_messages.proto#L83-L92
[6]: https://github.com/GoogleCloudPlatform/cloud-bigtable-client/blob/8e363d72eb39d921dfdf5daf4a36032aa9d003e2/bigtable-protos/src/main/proto/google/protobuf/timestamp.proto#L78
  • Loading branch information
dhermes committed Dec 4, 2015
1 parent 9c95d66 commit 9bcb777
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 5 deletions.
86 changes: 84 additions & 2 deletions gcloud/bigtable/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
"""User friendly container for Google Cloud Bigtable Cluster."""


import datetime
import re

from gcloud._helpers import _EPOCH
from gcloud.bigtable._generated import bigtable_cluster_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_cluster_service_messages_pb2 as messages_pb2)
Expand All @@ -26,7 +28,16 @@
_CLUSTER_NAME_RE = re.compile(r'^projects/(?P<project>[^/]+)/'
r'zones/(?P<zone>[^/]+)/clusters/'
r'(?P<cluster_id>[a-z][-a-z0-9]*)$')
_OPERATION_NAME_RE = re.compile(r'^operations/projects/([^/]+)/zones/([^/]+)/'
r'clusters/([a-z][-a-z0-9]*)/operations/'
r'(?P<operation_id>\d+)$')
_DEFAULT_SERVE_NODES = 3
_TYPE_URL_BASE = 'type.googleapis.com/google.bigtable.'
_ADMIN_TYPE_URL_BASE = _TYPE_URL_BASE + 'admin.cluster.v1.'
_CLUSTER_CREATE_METADATA = _ADMIN_TYPE_URL_BASE + 'CreateClusterMetadata'
_TYPE_URL_MAP = {
_CLUSTER_CREATE_METADATA: messages_pb2.CreateClusterMetadata,
}


def _get_pb_property_value(message_pb, property_name):
Expand Down Expand Up @@ -74,6 +85,73 @@ def _prepare_create_request(cluster):
)


def _pb_timestamp_to_datetime(timestamp):
"""Convert a Timestamp protobuf to a datetime object.
:type timestamp: :class:`._generated.timestamp_pb2.Timestamp`
:param timestamp: A Google returned timestamp protobuf.
:rtype: :class:`datetime.datetime`
:returns: A UTC datetime object converted from a protobuf timestamp.
"""
return (
_EPOCH +
datetime.timedelta(
seconds=timestamp.seconds,
microseconds=(timestamp.nanos / 1000.0),
)
)


def _parse_pb_any_to_native(any_val, expected_type=None):
"""Convert a serialized "google.protobuf.Any" value to actual type.
:type any_val: :class:`gcloud.bigtable._generated.any_pb2.Any`
:param any_val: A serialized protobuf value container.
:type expected_type: str
:param expected_type: (Optional) The type URL we expect ``any_val``
to have.
:rtype: object
:returns: The de-serialized object.
:raises: :class:`ValueError <exceptions.ValueError>` if the
``expected_type`` does not match the ``type_url`` on the input.
"""
if expected_type is not None and expected_type != any_val.type_url:
raise ValueError('Expected type: %s, Received: %s' % (
expected_type, any_val.type_url))
container_class = _TYPE_URL_MAP[any_val.type_url]
return container_class.FromString(any_val.value)


def _process_operation(operation_pb):
"""Processes a create protobuf response.
:type operation_pb: :class:`operations_pb2.Operation`
:param operation_pb: The long-running operation response from a
Create/Update/Undelete cluster request.
:rtype: tuple
:returns: A pair of an integer and datetime stamp. The integer is the ID
of the operation (``operation_id``) and the timestamp when
the create operation began (``operation_begin``).
:raises: :class:`ValueError <exceptions.ValueError>` if the operation name
doesn't match the :data:`_OPERATION_NAME_RE` regex.
"""
match = _OPERATION_NAME_RE.match(operation_pb.name)
if match is None:
raise ValueError('Cluster create operation name was not in the '
'expected format.', operation_pb.name)
operation_id = int(match.group('operation_id'))

request_metadata = _parse_pb_any_to_native(operation_pb.metadata)
operation_begin = _pb_timestamp_to_datetime(
request_metadata.request_time)

return operation_id, operation_begin


class Cluster(object):
"""Representation of a Google Cloud Bigtable Cluster.
Expand Down Expand Up @@ -105,7 +183,9 @@ def __init__(self, zone, cluster_id, client,
self.display_name = display_name or cluster_id
self.serve_nodes = serve_nodes
self._client = client
self._operation = None
self._operation_type = None
self._operation_id = None
self._operation_begin = None

def table(self, table_id):
"""Factory to create a table associated with this cluster.
Expand Down Expand Up @@ -217,4 +297,6 @@ def create(self):
cluster_pb = self._client._cluster_stub.CreateCluster(
request_pb, self._client.timeout_seconds)

self._operation = cluster_pb.current_operation
self._operation_type = 'create'
self._operation_id, self._operation_begin = _process_operation(
cluster_pb.current_operation)
184 changes: 181 additions & 3 deletions gcloud/bigtable/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def test_create(self):

# Create response_pb
op_id = 5678
op_begin = object()
op_name = ('operations/projects/%s/zones/%s/clusters/%s/'
'operations/%d' % (project, zone, cluster_id, op_id))
current_op = operations_pb2.Operation(name=op_name)
Expand All @@ -244,14 +245,22 @@ def test_create(self):
# Create expected_result.
expected_result = None # create() has no return value.

# Perform the method and check the result.
# Create the mocks.
prep_create_called = []

def mock_prep_create_req(cluster):
prep_create_called.append(cluster)
return request_pb

with _Monkey(MUT, _prepare_create_request=mock_prep_create_req):
process_operation_called = []

def mock_process_operation(operation_pb):
process_operation_called.append(operation_pb)
return (op_id, op_begin)

# Perform the method and check the result.
with _Monkey(MUT, _prepare_create_request=mock_prep_create_req,
_process_operation=mock_process_operation):
result = cluster.create()

self.assertEqual(result, expected_result)
Expand All @@ -260,8 +269,11 @@ def mock_prep_create_req(cluster):
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(cluster._operation, current_op)
self.assertEqual(cluster._operation_type, 'create')
self.assertEqual(cluster._operation_id, op_id)
self.assertTrue(cluster._operation_begin is op_begin)
self.assertEqual(prep_create_called, [cluster])
self.assertEqual(process_operation_called, [current_op])


class Test__get_pb_property_value(unittest2.TestCase):
Expand Down Expand Up @@ -319,6 +331,172 @@ def test_it(self):
self.assertEqual(request_pb.cluster.serve_nodes, serve_nodes)


class Test__pb_timestamp_to_datetime(unittest2.TestCase):

def _callFUT(self, timestamp):
from gcloud.bigtable.cluster import _pb_timestamp_to_datetime
return _pb_timestamp_to_datetime(timestamp)

def test_it(self):
import datetime
from gcloud._helpers import UTC
from gcloud.bigtable._generated.timestamp_pb2 import Timestamp

# Epoch is midnight on January 1, 1970 ...
dt_stamp = datetime.datetime(1970, month=1, day=1, hour=0,
minute=1, second=1, microsecond=1234,
tzinfo=UTC)
# ... so 1 minute and 1 second after is 61 seconds and 1234
# microseconds is 1234000 nanoseconds.
timestamp = Timestamp(seconds=61, nanos=1234000)
self.assertEqual(self._callFUT(timestamp), dt_stamp)


class Test__parse_pb_any_to_native(unittest2.TestCase):

def _callFUT(self, any_val, expected_type=None):
from gcloud.bigtable.cluster import _parse_pb_any_to_native
return _parse_pb_any_to_native(any_val, expected_type=expected_type)

def test_with_known_type_url(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import any_pb2
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable import cluster as MUT

type_url = 'type.googleapis.com/' + data_pb2._CELL.full_name
fake_type_url_map = {type_url: data_pb2.Cell}

cell = data_pb2.Cell(
timestamp_micros=0,
value=b'foobar',
)
any_val = any_pb2.Any(
type_url=type_url,
value=cell.SerializeToString(),
)
with _Monkey(MUT, _TYPE_URL_MAP=fake_type_url_map):
result = self._callFUT(any_val)

self.assertEqual(result, cell)

def test_with_create_cluster_metadata(self):
from gcloud.bigtable._generated import any_pb2
from gcloud.bigtable._generated import (
bigtable_cluster_data_pb2 as data_pb2)
from gcloud.bigtable._generated import (
bigtable_cluster_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._generated.timestamp_pb2 import Timestamp

type_url = ('type.googleapis.com/' +
messages_pb2._CREATECLUSTERMETADATA.full_name)
metadata = messages_pb2.CreateClusterMetadata(
request_time=Timestamp(seconds=1, nanos=1234),
finish_time=Timestamp(seconds=10, nanos=891011),
original_request=messages_pb2.CreateClusterRequest(
name='foo',
cluster_id='bar',
cluster=data_pb2.Cluster(
display_name='quux',
serve_nodes=1337,
),
),
)

any_val = any_pb2.Any(
type_url=type_url,
value=metadata.SerializeToString(),
)
result = self._callFUT(any_val)
self.assertEqual(result, metadata)

def test_unknown_type_url(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import any_pb2
from gcloud.bigtable import cluster as MUT

fake_type_url_map = {}
any_val = any_pb2.Any()
with _Monkey(MUT, _TYPE_URL_MAP=fake_type_url_map):
with self.assertRaises(KeyError):
self._callFUT(any_val)

def test_disagreeing_type_url(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import any_pb2
from gcloud.bigtable import cluster as MUT

type_url1 = 'foo'
type_url2 = 'bar'
fake_type_url_map = {type_url1: None}
any_val = any_pb2.Any(type_url=type_url2)
with _Monkey(MUT, _TYPE_URL_MAP=fake_type_url_map):
with self.assertRaises(ValueError):
self._callFUT(any_val, expected_type=type_url1)


class Test__process_operation(unittest2.TestCase):

def _callFUT(self, operation_pb):
from gcloud.bigtable.cluster import _process_operation
return _process_operation(operation_pb)

def test_it(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import (
bigtable_cluster_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._generated import operations_pb2
from gcloud.bigtable import cluster as MUT

project = 'PROJECT'
zone = 'zone'
cluster_id = 'cluster-id'
expected_operation_id = 234
operation_name = ('operations/projects/%s/zones/%s/clusters/%s/'
'operations/%d' % (project, zone, cluster_id,
expected_operation_id))

current_op = operations_pb2.Operation(name=operation_name)

# Create mocks.
request_metadata = messages_pb2.CreateClusterMetadata()
parse_pb_any_called = []

def mock_parse_pb_any_to_native(any_val, expected_type=None):
parse_pb_any_called.append((any_val, expected_type))
return request_metadata

expected_operation_begin = object()
ts_to_dt_called = []

def mock_pb_timestamp_to_datetime(timestamp):
ts_to_dt_called.append(timestamp)
return expected_operation_begin

# Exectute method with mocks in place.
with _Monkey(MUT, _parse_pb_any_to_native=mock_parse_pb_any_to_native,
_pb_timestamp_to_datetime=mock_pb_timestamp_to_datetime):
operation_id, operation_begin = self._callFUT(current_op)

# Check outputs.
self.assertEqual(operation_id, expected_operation_id)
self.assertTrue(operation_begin is expected_operation_begin)

# Check mocks were used correctly.
self.assertEqual(parse_pb_any_called, [(current_op.metadata, None)])
self.assertEqual(ts_to_dt_called, [request_metadata.request_time])

def test_op_name_parsing_failure(self):
from gcloud.bigtable._generated import (
bigtable_cluster_data_pb2 as data_pb2)
from gcloud.bigtable._generated import operations_pb2

current_op = operations_pb2.Operation(name='invalid')
cluster = data_pb2.Cluster(current_operation=current_op)
with self.assertRaises(ValueError):
self._callFUT(cluster)


class _Client(object):

def __init__(self, project, timeout_seconds=None):
Expand Down

0 comments on commit 9bcb777

Please sign in to comment.