Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bigtable/google/cloud/bigtable/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
bigtable_instance_admin_pb2 as messages_v2_pb2)
from google.cloud.operation import Operation
from google.cloud.operation import _compute_type_url
from google.cloud.operation import _register_type_url
from google.cloud.operation import register_type_url


_CLUSTER_NAME_RE = re.compile(r'^projects/(?P<project>[^/]+)/'
Expand All @@ -36,7 +36,7 @@

_UPDATE_CLUSTER_METADATA_URL = _compute_type_url(
messages_v2_pb2.UpdateClusterMetadata)
_register_type_url(
register_type_url(
_UPDATE_CLUSTER_METADATA_URL, messages_v2_pb2.UpdateClusterMetadata)


Expand Down Expand Up @@ -218,7 +218,7 @@ def create(self):

operation = Operation.from_pb(operation_pb, client)
operation.target = self
operation.metadata['request_type'] = 'CreateCluster'
operation.caller_metadata['request_type'] = 'CreateCluster'
return operation

def update(self):
Expand Down Expand Up @@ -249,7 +249,7 @@ def update(self):

operation = Operation.from_pb(operation_pb, client)
operation.target = self
operation.metadata['request_type'] = 'UpdateCluster'
operation.caller_metadata['request_type'] = 'UpdateCluster'
return operation

def delete(self):
Expand Down
8 changes: 5 additions & 3 deletions bigtable/google/cloud/bigtable/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from google.cloud.bigtable.table import Table
from google.cloud.operation import Operation
from google.cloud.operation import _compute_type_url
from google.cloud.operation import _register_type_url
from google.cloud.operation import register_type_url


_EXISTING_INSTANCE_LOCATION_ID = 'see-existing-cluster'
Expand All @@ -38,8 +38,10 @@

_CREATE_INSTANCE_METADATA_URL = _compute_type_url(
messages_v2_pb2.CreateInstanceMetadata)
_register_type_url(
register_type_url(
_CREATE_INSTANCE_METADATA_URL, messages_v2_pb2.CreateInstanceMetadata)
_INSTANCE_METADATA_URL = _compute_type_url(data_v2_pb2.Instance)
register_type_url(_INSTANCE_METADATA_URL, data_v2_pb2.Instance)


def _prepare_create_request(instance):
Expand Down Expand Up @@ -237,7 +239,7 @@ def create(self):

operation = Operation.from_pb(operation_pb, self._client)
operation.target = self
operation.metadata['request_type'] = 'CreateInstance'
operation.caller_metadata['request_type'] = 'CreateInstance'
return operation

def update(self):
Expand Down
12 changes: 7 additions & 5 deletions bigtable/unit_tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,9 @@ def test_create(self):
self.assertEqual(result.name, OP_NAME)
self.assertIs(result.target, cluster)
self.assertIs(result.client, client)
self.assertIsNone(result.pb_metadata)
self.assertEqual(result.metadata, {'request_type': 'CreateCluster'})
self.assertIsNone(result.metadata)
self.assertEqual(result.caller_metadata,
{'request_type': 'CreateCluster'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down Expand Up @@ -323,10 +324,11 @@ def test_update(self):
self.assertEqual(result.name, OP_NAME)
self.assertIs(result.target, cluster)
self.assertIs(result.client, client)
self.assertIsInstance(result.pb_metadata,
self.assertIsInstance(result.metadata,
messages_v2_pb2.UpdateClusterMetadata)
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
self.assertEqual(result.metadata, {'request_type': 'UpdateCluster'})
self.assertEqual(result.metadata.request_time, NOW_PB)
self.assertEqual(result.caller_metadata,
{'request_type': 'UpdateCluster'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down
7 changes: 4 additions & 3 deletions bigtable/unit_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,11 @@ def test_create(self):
self.assertEqual(result.name, self.OP_NAME)
self.assertIs(result.target, instance)
self.assertIs(result.client, client)
self.assertIsInstance(result.pb_metadata,
self.assertIsInstance(result.metadata,
messages_v2_pb2.CreateInstanceMetadata)
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
self.assertEqual(result.metadata, {'request_type': 'CreateInstance'})
self.assertEqual(result.metadata.request_time, NOW_PB)
self.assertEqual(result.caller_metadata,
{'request_type': 'CreateInstance'})

self.assertEqual(len(stub.method_calls), 1)
api_name, args, kwargs = stub.method_calls[0]
Expand Down
127 changes: 99 additions & 28 deletions core/google/cloud/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def _compute_type_url(klass, prefix=_GOOGLE_APIS_PREFIX):
return '%s/%s' % (prefix, name)


def _register_type_url(type_url, klass):
def register_type_url(type_url, klass):
"""Register a klass as the factory for a given type URL.

:type type_url: str
Expand All @@ -57,55 +57,102 @@ def _register_type_url(type_url, klass):
_TYPE_URL_MAP[type_url] = klass


def _from_any(any_pb):
"""Convert an ``Any`` protobuf into the actual class.

Uses the type URL to do the conversion.

.. note::

This assumes that the type URL is already registered.

:type any_pb: :class:`google.protobuf.any_pb2.Any`
:param any_pb: An any object to be converted.

:rtype: object
:returns: The instance (of the correct type) stored in the any
instance.
"""
klass = _TYPE_URL_MAP[any_pb.type_url]
return klass.FromString(any_pb.value)


class Operation(object):
"""Representation of a Google API Long-Running Operation.

.. _protobuf: https://github.com/googleapis/googleapis/blob/\
050400df0fdb16f63b63e9dee53819044bffc857/\
google/longrunning/operations.proto#L80
.. _service: https://github.com/googleapis/googleapis/blob/\
050400df0fdb16f63b63e9dee53819044bffc857/\
google/longrunning/operations.proto#L38
.. _JSON: https://cloud.google.com/speech/reference/rest/\
v1beta1/operations#Operation

This wraps an operation `protobuf`_ object and attempts to
interact with the long-running operations `service`_ (specific
to a given API). (Some services also offer a `JSON`_
API that maps the same underlying data type.)

:type name: str
:param name: The fully-qualified path naming the operation.

:type client: object: must provide ``_operations_stub`` accessor.
:param client: The client used to poll for the status of the operation.

:type pb_metadata: object
:param pb_metadata: Instance of protobuf metadata class

:type kw: dict
:param kw: caller-assigned metadata about the operation
:type caller_metadata: dict
:param caller_metadata: caller-assigned metadata about the operation
"""

target = None
"""Instance assocated with the operations: callers may set."""

def __init__(self, name, client, pb_metadata=None, **kw):
response = None
"""Response returned from completed operation.

Only one of this and :attr:`error` can be populated.
"""

error = None
"""Error that resulted from a failed (complete) operation.

Only one of this and :attr:`response` can be populated.
"""

metadata = None
"""Metadata about the current operation (as a protobuf).

Code that uses operations must register the metadata types (via
:func:`register_type_url`) to ensure that the metadata fields can be
converted into the correct types.
"""

def __init__(self, name, client, **caller_metadata):
self.name = name
self.client = client
self.pb_metadata = pb_metadata
self.metadata = kw.copy()
self.caller_metadata = caller_metadata.copy()
self._complete = False

@classmethod
def from_pb(cls, op_pb, client, **kw):
def from_pb(cls, operation_pb, client, **caller_metadata):
"""Factory: construct an instance from a protobuf.

:type op_pb: :class:`google.longrunning.operations_pb2.Operation`
:param op_pb: Protobuf to be parsed.
:type operation_pb:
:class:`~google.longrunning.operations_pb2.Operation`
:param operation_pb: Protobuf to be parsed.

:type client: object: must provide ``_operations_stub`` accessor.
:param client: The client used to poll for the status of the operation.

:type kw: dict
:param kw: caller-assigned metadata about the operation
:type caller_metadata: dict
:param caller_metadata: caller-assigned metadata about the operation

:rtype: :class:`Operation`
:returns: new instance, with attributes based on the protobuf.
"""
pb_metadata = None
if op_pb.metadata.type_url:
type_url = op_pb.metadata.type_url
md_klass = _TYPE_URL_MAP.get(type_url)
if md_klass:
pb_metadata = md_klass.FromString(op_pb.metadata.value)
return cls(op_pb.name, client, pb_metadata, **kw)
result = cls(operation_pb.name, client, **caller_metadata)
result._update_state(operation_pb)
return result

@property
def complete(self):
Expand All @@ -116,22 +163,46 @@ def complete(self):
"""
return self._complete

def _get_operation_rpc(self):
"""Polls the status of the current operation.

:rtype: :class:`~google.longrunning.operations_pb2.Operation`
:returns: The latest status of the current operation.
"""
request_pb = operations_pb2.GetOperationRequest(name=self.name)
return self.client._operations_stub.GetOperation(request_pb)

def _update_state(self, operation_pb):
"""Update the state of the current object based on operation.

:type operation_pb:
:class:`~google.longrunning.operations_pb2.Operation`
:param operation_pb: Protobuf to be parsed.
"""
if operation_pb.done:
self._complete = True

if operation_pb.HasField('metadata'):
self.metadata = _from_any(operation_pb.metadata)

result_type = operation_pb.WhichOneof('result')
if result_type == 'error':
self.error = operation_pb.error
elif result_type == 'response':
self.response = _from_any(operation_pb.response)

def poll(self):
"""Check if the operation has finished.

:rtype: bool
:returns: A boolean indicating if the current operation has completed.
:raises: :class:`ValueError <exceptions.ValueError>` if the operation
:raises: :class:`~exceptions.ValueError` if the operation
has already completed.
"""
if self.complete:
raise ValueError('The operation has completed.')

request_pb = operations_pb2.GetOperationRequest(name=self.name)
# We expect a `google.longrunning.operations_pb2.Operation`.
operation_pb = self.client._operations_stub.GetOperation(request_pb)

if operation_pb.done:
self._complete = True
operation_pb = self._get_operation_rpc()
self._update_state(operation_pb)

return self.complete
Loading