Skip to content

Commit 6618165

Browse files
CuriousDimatseaver
authored andcommitted
Allow bulk update of records via 'MutateRows' API (#3401)
1 parent 7e3fae0 commit 6618165

File tree

4 files changed

+286
-1
lines changed

4 files changed

+286
-1
lines changed

bigtable/google/cloud/bigtable/table.py

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
"""User friendly container for Google Cloud Bigtable Table."""
15+
"""User-friendly container for Google Cloud Bigtable Table."""
16+
17+
18+
import six
1619

1720
from google.cloud._helpers import _to_bytes
1821
from google.cloud.bigtable._generated import (
@@ -29,6 +32,19 @@
2932
from google.cloud.bigtable.row_data import PartialRowsData
3033

3134

35+
# Maximum number of mutations in bulk (MutateRowsRequest message):
36+
# https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest
37+
_MAX_BULK_MUTATIONS = 100000
38+
39+
40+
class TableMismatchError(ValueError):
41+
"""Row from another table."""
42+
43+
44+
class TooManyMutationsError(ValueError):
45+
"""The number of mutations for bulk request is too big."""
46+
47+
3248
class Table(object):
3349
"""Representation of a Google Cloud Bigtable Table.
3450
@@ -276,6 +292,35 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
276292
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
277293
return PartialRowsData(response_iterator)
278294

295+
def mutate_rows(self, rows):
296+
"""Mutates multiple rows in bulk.
297+
298+
The method tries to update all specified rows.
299+
If some of the rows weren't updated, it would not remove mutations.
300+
They can be applied to the row separately.
301+
If row mutations finished successfully, they would be cleaned up.
302+
303+
:type rows: list
304+
:param rows: List or other iterable of :class:`.DirectRow` instances.
305+
306+
:rtype: list
307+
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
308+
corresponding to success or failure of each row mutation
309+
sent. These will be in the same order as the `rows`.
310+
"""
311+
mutate_rows_request = _mutate_rows_request(self.name, rows)
312+
client = self._instance._client
313+
responses = client._data_stub.MutateRows(mutate_rows_request)
314+
315+
responses_statuses = [
316+
None for _ in six.moves.xrange(len(mutate_rows_request.entries))]
317+
for response in responses:
318+
for entry in response.entries:
319+
responses_statuses[entry.index] = entry.status
320+
if entry.status.code == 0:
321+
rows[entry.index].clear()
322+
return responses_statuses
323+
279324
def sample_row_keys(self):
280325
"""Read a sample of row keys in the table.
281326
@@ -373,3 +418,67 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
373418
message.rows.row_ranges.add(**range_kwargs)
374419

375420
return message
421+
422+
423+
def _mutate_rows_request(table_name, rows):
424+
"""Creates a request to mutate rows in a table.
425+
426+
:type table_name: str
427+
:param table_name: The name of the table to write to.
428+
429+
:type rows: list
430+
:param rows: List or other iterable of :class:`.DirectRow` instances.
431+
432+
:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest`
433+
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs.
434+
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
435+
greater than 100,000
436+
"""
437+
request_pb = data_messages_v2_pb2.MutateRowsRequest(table_name=table_name)
438+
mutations_count = 0
439+
for row in rows:
440+
_check_row_table_name(table_name, row)
441+
_check_row_type(row)
442+
entry = request_pb.entries.add()
443+
entry.row_key = row.row_key
444+
# NOTE: Since `_check_row_type` has verified `row` is a `DirectRow`,
445+
# the mutations have no state.
446+
for mutation in row._get_mutations(None):
447+
mutations_count += 1
448+
entry.mutations.add().CopyFrom(mutation)
449+
if mutations_count > _MAX_BULK_MUTATIONS:
450+
raise TooManyMutationsError('Maximum number of mutations is %s' %
451+
(_MAX_BULK_MUTATIONS,))
452+
return request_pb
453+
454+
455+
def _check_row_table_name(table_name, row):
456+
"""Checks that a row belongs to a table.
457+
458+
:type table_name: str
459+
:param table_name: The name of the table.
460+
461+
:type row: :class:`.Row`
462+
:param row: An instance of :class:`.Row` subclasses.
463+
464+
:raises: :exc:`~.table.TableMismatchError` if the row does not belong to
465+
the table.
466+
"""
467+
if row.table.name != table_name:
468+
raise TableMismatchError(
469+
'Row %s is a part of %s table. Current table: %s' %
470+
(row.row_key, row.table.name, table_name))
471+
472+
473+
def _check_row_type(row):
474+
"""Checks that a row is an instance of :class:`.DirectRow`.
475+
476+
:type row: :class:`.Row`
477+
:param row: An instance of :class:`.Row` subclasses.
478+
479+
:raises: :class:`TypeError <exceptions.TypeError>` if the row is not an
480+
instance of DirectRow.
481+
"""
482+
if not isinstance(row, DirectRow):
483+
raise TypeError('Bulk processing can not be applied for '
484+
'conditional or append mutations.')

bigtable/tests/system.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,33 @@ def _write_to_row(self, row1=None, row2=None, row3=None, row4=None):
356356
cell4 = Cell(CELL_VAL4, timestamp4)
357357
return cell1, cell2, cell3, cell4
358358

359+
def test_mutate_rows(self):
360+
row1 = self._table.row(ROW_KEY)
361+
row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1)
362+
row1.commit()
363+
self.rows_to_delete.append(row1)
364+
row2 = self._table.row(ROW_KEY_ALT)
365+
row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2)
366+
row2.commit()
367+
self.rows_to_delete.append(row2)
368+
369+
# Change the contents
370+
row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL3)
371+
row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL4)
372+
rows = [row1, row2]
373+
statuses = self._table.mutate_rows(rows)
374+
result = [status.code for status in statuses]
375+
expected_result = [0, 0]
376+
self.assertEqual(result, expected_result)
377+
378+
# Check the contents
379+
row1_data = self._table.read_row(ROW_KEY)
380+
self.assertEqual(
381+
row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL3)
382+
row2_data = self._table.read_row(ROW_KEY_ALT)
383+
self.assertEqual(
384+
row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL4)
385+
359386
def test_read_large_cell_limit(self):
360387
row = self._table.row(ROW_KEY)
361388
self.rows_to_delete.append(row)

bigtable/tests/unit/test_row.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class TestRow(unittest.TestCase):
2121
@staticmethod
2222
def _get_target_class():
2323
from google.cloud.bigtable.row import Row
24+
2425
return Row
2526

2627
def _make_one(self, *args, **kwargs):

bigtable/tests/unit/test_table.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,109 @@
1515

1616
import unittest
1717

18+
import mock
19+
20+
21+
class Test___mutate_rows_request(unittest.TestCase):
22+
23+
def _call_fut(self, table_name, rows):
24+
from google.cloud.bigtable.table import _mutate_rows_request
25+
26+
return _mutate_rows_request(table_name, rows)
27+
28+
@mock.patch('google.cloud.bigtable.table._MAX_BULK_MUTATIONS', new=3)
29+
def test__mutate_rows_too_many_mutations(self):
30+
from google.cloud.bigtable.row import DirectRow
31+
from google.cloud.bigtable.table import TooManyMutationsError
32+
33+
table = mock.Mock(name='table', spec=['name'])
34+
table.name = 'table'
35+
rows = [DirectRow(row_key=b'row_key', table=table),
36+
DirectRow(row_key=b'row_key_2', table=table)]
37+
rows[0].set_cell('cf1', b'c1', 1)
38+
rows[0].set_cell('cf1', b'c1', 2)
39+
rows[1].set_cell('cf1', b'c1', 3)
40+
rows[1].set_cell('cf1', b'c1', 4)
41+
with self.assertRaises(TooManyMutationsError):
42+
self._call_fut('table', rows)
43+
44+
def test__mutate_rows_request(self):
45+
from google.cloud.bigtable.row import DirectRow
46+
47+
table = mock.Mock(name='table', spec=['name'])
48+
table.name = 'table'
49+
rows = [DirectRow(row_key=b'row_key', table=table),
50+
DirectRow(row_key=b'row_key_2', table=table)]
51+
rows[0].set_cell('cf1', b'c1', b'1')
52+
rows[1].set_cell('cf1', b'c1', b'2')
53+
result = self._call_fut('table', rows)
54+
55+
expected_result = _mutate_rows_request_pb(table_name='table')
56+
entry1 = expected_result.entries.add()
57+
entry1.row_key = b'row_key'
58+
mutations1 = entry1.mutations.add()
59+
mutations1.set_cell.family_name = 'cf1'
60+
mutations1.set_cell.column_qualifier = b'c1'
61+
mutations1.set_cell.timestamp_micros = -1
62+
mutations1.set_cell.value = b'1'
63+
entry2 = expected_result.entries.add()
64+
entry2.row_key = b'row_key_2'
65+
mutations2 = entry2.mutations.add()
66+
mutations2.set_cell.family_name = 'cf1'
67+
mutations2.set_cell.column_qualifier = b'c1'
68+
mutations2.set_cell.timestamp_micros = -1
69+
mutations2.set_cell.value = b'2'
70+
71+
self.assertEqual(result, expected_result)
72+
73+
74+
class Test__check_row_table_name(unittest.TestCase):
75+
76+
def _call_fut(self, table_name, row):
77+
from google.cloud.bigtable.table import _check_row_table_name
78+
79+
return _check_row_table_name(table_name, row)
80+
81+
def test_wrong_table_name(self):
82+
from google.cloud.bigtable.table import TableMismatchError
83+
from google.cloud.bigtable.row import DirectRow
84+
85+
table = mock.Mock(name='table', spec=['name'])
86+
table.name = 'table'
87+
row = DirectRow(row_key=b'row_key', table=table)
88+
with self.assertRaises(TableMismatchError):
89+
self._call_fut('other_table', row)
90+
91+
def test_right_table_name(self):
92+
from google.cloud.bigtable.row import DirectRow
93+
94+
table = mock.Mock(name='table', spec=['name'])
95+
table.name = 'table'
96+
row = DirectRow(row_key=b'row_key', table=table)
97+
result = self._call_fut('table', row)
98+
self.assertFalse(result)
99+
100+
101+
class Test__check_row_type(unittest.TestCase):
102+
def _call_fut(self, row):
103+
from google.cloud.bigtable.table import _check_row_type
104+
105+
return _check_row_type(row)
106+
107+
def test_test_wrong_row_type(self):
108+
from google.cloud.bigtable.row import ConditionalRow
109+
110+
row = ConditionalRow(row_key=b'row_key', table='table', filter_=None)
111+
with self.assertRaises(TypeError):
112+
self._call_fut(row)
113+
114+
def test_right_row_type(self):
115+
from google.cloud.bigtable.row import DirectRow
116+
117+
row = DirectRow(row_key=b'row_key', table='table')
118+
result = self._call_fut(row)
119+
self.assertFalse(result)
120+
18121

19122
class TestTable(unittest.TestCase):
20123

@@ -348,6 +451,44 @@ def test_read_row_still_partial(self):
348451
with self.assertRaises(ValueError):
349452
self._read_row_helper(chunks, None)
350453

454+
def test_mutate_rows(self):
455+
from google.cloud.bigtable._generated.bigtable_pb2 import (
456+
MutateRowsResponse)
457+
from google.cloud.bigtable.row import DirectRow
458+
from google.rpc.status_pb2 import Status
459+
from tests.unit._testing import _FakeStub
460+
461+
client = _Client()
462+
instance = _Instance(self.INSTANCE_NAME, client=client)
463+
table = self._make_one(self.TABLE_ID, instance)
464+
465+
row_1 = DirectRow(row_key=b'row_key', table=table)
466+
row_1.set_cell('cf', b'col', b'value1')
467+
row_2 = DirectRow(row_key=b'row_key_2', table=table)
468+
row_2.set_cell('cf', b'col', b'value2')
469+
470+
response = MutateRowsResponse(
471+
entries=[
472+
MutateRowsResponse.Entry(
473+
index=0,
474+
status=Status(code=0),
475+
),
476+
MutateRowsResponse.Entry(
477+
index=1,
478+
status=Status(code=1),
479+
),
480+
],
481+
)
482+
483+
# Patch the stub used by the API method.
484+
client._data_stub = _FakeStub([response])
485+
statuses = table.mutate_rows([row_1, row_2])
486+
result = [status.code for status in statuses]
487+
expected_result = [0, 1]
488+
489+
self.assertEqual(result, expected_result)
490+
491+
351492
def test_read_rows(self):
352493
from google.cloud._testing import _Monkey
353494
from tests.unit._testing import _FakeStub
@@ -570,6 +711,13 @@ def _SampleRowKeysRequestPB(*args, **kw):
570711
return messages_v2_pb2.SampleRowKeysRequest(*args, **kw)
571712

572713

714+
def _mutate_rows_request_pb(*args, **kw):
715+
from google.cloud.bigtable._generated import (
716+
bigtable_pb2 as data_messages_v2_pb2)
717+
718+
return data_messages_v2_pb2.MutateRowsRequest(*args, **kw)
719+
720+
573721
def _TablePB(*args, **kw):
574722
from google.cloud.bigtable._generated import (
575723
table_pb2 as table_v2_pb2)

0 commit comments

Comments
 (0)