Skip to content

Commit 5e26112

Browse files
committed
Re-factoring Operation base class.
This is in preparation to support JSON/HTTP operations as well and also to ensure that **all** of the operation PB is parsed when polling.
1 parent e1fd690 commit 5e26112

File tree

6 files changed

+241
-79
lines changed

6 files changed

+241
-79
lines changed

bigtable/google/cloud/bigtable/cluster.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
bigtable_instance_admin_pb2 as messages_v2_pb2)
2424
from google.cloud.operation import Operation
2525
from google.cloud.operation import _compute_type_url
26-
from google.cloud.operation import _register_type_url
26+
from google.cloud.operation import register_type_url
2727

2828

2929
_CLUSTER_NAME_RE = re.compile(r'^projects/(?P<project>[^/]+)/'
@@ -36,7 +36,7 @@
3636

3737
_UPDATE_CLUSTER_METADATA_URL = _compute_type_url(
3838
messages_v2_pb2.UpdateClusterMetadata)
39-
_register_type_url(
39+
register_type_url(
4040
_UPDATE_CLUSTER_METADATA_URL, messages_v2_pb2.UpdateClusterMetadata)
4141

4242

@@ -218,7 +218,7 @@ def create(self):
218218

219219
operation = Operation.from_pb(operation_pb, client)
220220
operation.target = self
221-
operation.metadata['request_type'] = 'CreateCluster'
221+
operation.caller_metadata['request_type'] = 'CreateCluster'
222222
return operation
223223

224224
def update(self):
@@ -249,7 +249,7 @@ def update(self):
249249

250250
operation = Operation.from_pb(operation_pb, client)
251251
operation.target = self
252-
operation.metadata['request_type'] = 'UpdateCluster'
252+
operation.caller_metadata['request_type'] = 'UpdateCluster'
253253
return operation
254254

255255
def delete(self):

bigtable/google/cloud/bigtable/instance.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from google.cloud.bigtable.table import Table
2929
from google.cloud.operation import Operation
3030
from google.cloud.operation import _compute_type_url
31-
from google.cloud.operation import _register_type_url
31+
from google.cloud.operation import register_type_url
3232

3333

3434
_EXISTING_INSTANCE_LOCATION_ID = 'see-existing-cluster'
@@ -38,8 +38,10 @@
3838

3939
_CREATE_INSTANCE_METADATA_URL = _compute_type_url(
4040
messages_v2_pb2.CreateInstanceMetadata)
41-
_register_type_url(
41+
register_type_url(
4242
_CREATE_INSTANCE_METADATA_URL, messages_v2_pb2.CreateInstanceMetadata)
43+
_INSTANCE_METADATA_URL = _compute_type_url(data_v2_pb2.Instance)
44+
register_type_url(_INSTANCE_METADATA_URL, data_v2_pb2.Instance)
4345

4446

4547
def _prepare_create_request(instance):
@@ -237,7 +239,7 @@ def create(self):
237239

238240
operation = Operation.from_pb(operation_pb, self._client)
239241
operation.target = self
240-
operation.metadata['request_type'] = 'CreateInstance'
242+
operation.caller_metadata['request_type'] = 'CreateInstance'
241243
return operation
242244

243245
def update(self):

bigtable/unit_tests/test_cluster.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,9 @@ def test_create(self):
257257
self.assertEqual(result.name, OP_NAME)
258258
self.assertIs(result.target, cluster)
259259
self.assertIs(result.client, client)
260-
self.assertIsNone(result.pb_metadata)
261-
self.assertEqual(result.metadata, {'request_type': 'CreateCluster'})
260+
self.assertIsNone(result.metadata)
261+
self.assertEqual(result.caller_metadata,
262+
{'request_type': 'CreateCluster'})
262263

263264
self.assertEqual(len(stub.method_calls), 1)
264265
api_name, args, kwargs = stub.method_calls[0]
@@ -323,10 +324,11 @@ def test_update(self):
323324
self.assertEqual(result.name, OP_NAME)
324325
self.assertIs(result.target, cluster)
325326
self.assertIs(result.client, client)
326-
self.assertIsInstance(result.pb_metadata,
327+
self.assertIsInstance(result.metadata,
327328
messages_v2_pb2.UpdateClusterMetadata)
328-
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
329-
self.assertEqual(result.metadata, {'request_type': 'UpdateCluster'})
329+
self.assertEqual(result.metadata.request_time, NOW_PB)
330+
self.assertEqual(result.caller_metadata,
331+
{'request_type': 'UpdateCluster'})
330332

331333
self.assertEqual(len(stub.method_calls), 1)
332334
api_name, args, kwargs = stub.method_calls[0]

bigtable/unit_tests/test_instance.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,11 @@ def test_create(self):
265265
self.assertEqual(result.name, self.OP_NAME)
266266
self.assertIs(result.target, instance)
267267
self.assertIs(result.client, client)
268-
self.assertIsInstance(result.pb_metadata,
268+
self.assertIsInstance(result.metadata,
269269
messages_v2_pb2.CreateInstanceMetadata)
270-
self.assertEqual(result.pb_metadata.request_time, NOW_PB)
271-
self.assertEqual(result.metadata, {'request_type': 'CreateInstance'})
270+
self.assertEqual(result.metadata.request_time, NOW_PB)
271+
self.assertEqual(result.caller_metadata,
272+
{'request_type': 'CreateInstance'})
272273

273274
self.assertEqual(len(stub.method_calls), 1)
274275
api_name, args, kwargs = stub.method_calls[0]

core/google/cloud/operation.py

Lines changed: 99 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def _compute_type_url(klass, prefix=_GOOGLE_APIS_PREFIX):
3939
return '%s/%s' % (prefix, name)
4040

4141

42-
def _register_type_url(type_url, klass):
42+
def register_type_url(type_url, klass):
4343
"""Register a klass as the factory for a given type URL.
4444
4545
:type type_url: str
@@ -57,55 +57,102 @@ def _register_type_url(type_url, klass):
5757
_TYPE_URL_MAP[type_url] = klass
5858

5959

60+
def _from_any(any_pb):
61+
"""Convert an ``Any`` protobuf into the actual class.
62+
63+
Uses the type URL to do the conversion.
64+
65+
.. note::
66+
67+
This assumes that the type URL is already registered.
68+
69+
:type any_pb: :class:`google.protobuf.any_pb2.Any`
70+
:param any_pb: An any object to be converted.
71+
72+
:rtype: object
73+
:returns: The instance (of the correct type) stored in the any
74+
instance.
75+
"""
76+
klass = _TYPE_URL_MAP[any_pb.type_url]
77+
return klass.FromString(any_pb.value)
78+
79+
6080
class Operation(object):
6181
"""Representation of a Google API Long-Running Operation.
6282
83+
.. _protobuf: https://github.com/googleapis/googleapis/blob/\
84+
050400df0fdb16f63b63e9dee53819044bffc857/\
85+
google/longrunning/operations.proto#L80
86+
.. _service: https://github.com/googleapis/googleapis/blob/\
87+
050400df0fdb16f63b63e9dee53819044bffc857/\
88+
google/longrunning/operations.proto#L38
89+
.. _JSON: https://cloud.google.com/speech/reference/rest/\
90+
v1beta1/operations#Operation
91+
92+
This wraps an operation `protobuf`_ object and attempts to
93+
interact with the long-running operations `service`_ (specific
94+
to a given API). (Some services also offer a `JSON`_
95+
API that maps the same underlying data type.)
96+
6397
:type name: str
6498
:param name: The fully-qualified path naming the operation.
6599
66100
:type client: object: must provide ``_operations_stub`` accessor.
67101
:param client: The client used to poll for the status of the operation.
68102
69-
:type pb_metadata: object
70-
:param pb_metadata: Instance of protobuf metadata class
71-
72-
:type kw: dict
73-
:param kw: caller-assigned metadata about the operation
103+
:type caller_metadata: dict
104+
:param caller_metadata: caller-assigned metadata about the operation
74105
"""
75106

76107
target = None
77108
"""Instance assocated with the operations: callers may set."""
78109

79-
def __init__(self, name, client, pb_metadata=None, **kw):
110+
response = None
111+
"""Response returned from completed operation.
112+
113+
Only one of this and :attr:`error` can be populated.
114+
"""
115+
116+
error = None
117+
"""Error that resulted from a failed (complete) operation.
118+
119+
Only one of this and :attr:`response` can be populated.
120+
"""
121+
122+
metadata = None
123+
"""Metadata about the current operation (as a protobuf).
124+
125+
Code that uses operations must register the metadata types (via
126+
:func:`register_type_url`) to ensure that the metadata fields can be
127+
converted into the correct types.
128+
"""
129+
130+
def __init__(self, name, client, **caller_metadata):
80131
self.name = name
81132
self.client = client
82-
self.pb_metadata = pb_metadata
83-
self.metadata = kw.copy()
133+
self.caller_metadata = caller_metadata.copy()
84134
self._complete = False
85135

86136
@classmethod
87-
def from_pb(cls, op_pb, client, **kw):
137+
def from_pb(cls, operation_pb, client, **caller_metadata):
88138
"""Factory: construct an instance from a protobuf.
89139
90-
:type op_pb: :class:`google.longrunning.operations_pb2.Operation`
91-
:param op_pb: Protobuf to be parsed.
140+
:type operation_pb:
141+
:class:`~google.longrunning.operations_pb2.Operation`
142+
:param operation_pb: Protobuf to be parsed.
92143
93144
:type client: object: must provide ``_operations_stub`` accessor.
94145
:param client: The client used to poll for the status of the operation.
95146
96-
:type kw: dict
97-
:param kw: caller-assigned metadata about the operation
147+
:type caller_metadata: dict
148+
:param caller_metadata: caller-assigned metadata about the operation
98149
99150
:rtype: :class:`Operation`
100151
:returns: new instance, with attributes based on the protobuf.
101152
"""
102-
pb_metadata = None
103-
if op_pb.metadata.type_url:
104-
type_url = op_pb.metadata.type_url
105-
md_klass = _TYPE_URL_MAP.get(type_url)
106-
if md_klass:
107-
pb_metadata = md_klass.FromString(op_pb.metadata.value)
108-
return cls(op_pb.name, client, pb_metadata, **kw)
153+
result = cls(operation_pb.name, client, **caller_metadata)
154+
result._update_state(operation_pb)
155+
return result
109156

110157
@property
111158
def complete(self):
@@ -116,22 +163,46 @@ def complete(self):
116163
"""
117164
return self._complete
118165

166+
def _get_operation_rpc(self):
167+
"""Polls the status of the current operation.
168+
169+
:rtype: :class:`~google.longrunning.operations_pb2.Operation`
170+
:returns: The latest status of the current operation.
171+
"""
172+
request_pb = operations_pb2.GetOperationRequest(name=self.name)
173+
return self.client._operations_stub.GetOperation(request_pb)
174+
175+
def _update_state(self, operation_pb):
176+
"""Update the state of the current object based on operation.
177+
178+
:type operation_pb:
179+
:class:`~google.longrunning.operations_pb2.Operation`
180+
:param operation_pb: Protobuf to be parsed.
181+
"""
182+
if operation_pb.done:
183+
self._complete = True
184+
185+
if operation_pb.HasField('metadata'):
186+
self.metadata = _from_any(operation_pb.metadata)
187+
188+
result_type = operation_pb.WhichOneof('result')
189+
if result_type == 'error':
190+
self.error = operation_pb.error
191+
elif result_type == 'response':
192+
self.response = _from_any(operation_pb.response)
193+
119194
def poll(self):
120195
"""Check if the operation has finished.
121196
122197
:rtype: bool
123198
:returns: A boolean indicating if the current operation has completed.
124-
:raises: :class:`ValueError <exceptions.ValueError>` if the operation
199+
:raises: :class:`~exceptions.ValueError` if the operation
125200
has already completed.
126201
"""
127202
if self.complete:
128203
raise ValueError('The operation has completed.')
129204

130-
request_pb = operations_pb2.GetOperationRequest(name=self.name)
131-
# We expect a `google.longrunning.operations_pb2.Operation`.
132-
operation_pb = self.client._operations_stub.GetOperation(request_pb)
133-
134-
if operation_pb.done:
135-
self._complete = True
205+
operation_pb = self._get_operation_rpc()
206+
self._update_state(operation_pb)
136207

137208
return self.complete

0 commit comments

Comments
 (0)