Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 110 additions & 1 deletion bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""User friendly container for Google Cloud Bigtable Table."""
"""User-friendly container for Google Cloud Bigtable Table."""


import six

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
Expand All @@ -29,6 +32,19 @@
from google.cloud.bigtable.row_data import PartialRowsData


# Maximum number of mutations in bulk (MutateRowsRequest message):
# https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowRequest
_MAX_BULK_MUTATIONS = 100000


class TableMismatchError(ValueError):
"""Row from another table."""


class TooManyMutationsError(ValueError):
"""The number of mutations for bulk request is too big."""


class Table(object):
"""Representation of a Google Cloud Bigtable Table.

Expand Down Expand Up @@ -276,6 +292,35 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)

def mutate_rows(self, rows):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Mutates multiple rows in bulk.

The method tries to update all specified rows.
If some of the rows weren't updated, it would not remove mutations.
They can be applied to the row separately.
If row mutations finished successfully, they would be cleaned up.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:rtype: list
:returns: A list of response statuses (`google.rpc.status_pb2.Status`)
corresponding to success or failure of each row mutation
sent. These will be in the same order as the `rows`.
"""
mutate_rows_request = _mutate_rows_request(self.name, rows)
client = self._instance._client
responses = client._data_stub.MutateRows(mutate_rows_request)

responses_statuses = [
None for _ in six.moves.xrange(len(mutate_rows_request.entries))]
for response in responses:
for entry in response.entries:
responses_statuses[entry.index] = entry.status
if entry.status.code == 0:
rows[entry.index].clear()

This comment was marked as spam.

This comment was marked as spam.

return responses_statuses

This comment was marked as spam.

This comment was marked as spam.


def sample_row_keys(self):
"""Read a sample of row keys in the table.

Expand Down Expand Up @@ -373,3 +418,67 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
message.rows.row_ranges.add(**range_kwargs)

return message


def _mutate_rows_request(table_name, rows):
"""Creates a request to mutate rows in a table.

:type table_name: str
:param table_name: The name of the table to write to.

:type rows: list
:param rows: List or other iterable of :class:`.DirectRow` instances.

:rtype: :class:`data_messages_v2_pb2.MutateRowsRequest`
:returns: The ``MutateRowsRequest`` protobuf corresponding to the inputs.
:raises: :exc:`~.table.TooManyMutationsError` if the number of mutations is
greater than 100,000
"""
request_pb = data_messages_v2_pb2.MutateRowsRequest(table_name=table_name)
mutations_count = 0
for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
entry = request_pb.entries.add()
entry.row_key = row.row_key
# NOTE: Since `_check_row_type` has verified `row` is a `DirectRow`,
# the mutations have no state.
for mutation in row._get_mutations(None):
mutations_count += 1
entry.mutations.add().CopyFrom(mutation)
if mutations_count > _MAX_BULK_MUTATIONS:
raise TooManyMutationsError('Maximum number of mutations is %s' %
(_MAX_BULK_MUTATIONS,))
return request_pb


def _check_row_table_name(table_name, row):
"""Checks that a row belongs to a table.

:type table_name: str
:param table_name: The name of the table.

:type row: :class:`.Row`
:param row: An instance of :class:`.Row` subclasses.

:raises: :exc:`~.table.TableMismatchError` if the row does not belong to
the table.
"""
if row.table.name != table_name:
raise TableMismatchError(
'Row %s is a part of %s table. Current table: %s' %
(row.row_key, row.table.name, table_name))


def _check_row_type(row):
"""Checks that a row is an instance of :class:`.DirectRow`.

:type row: :class:`.Row`
:param row: An instance of :class:`.Row` subclasses.

:raises: :class:`TypeError <exceptions.TypeError>` if the row is not an
instance of DirectRow.
"""
if not isinstance(row, DirectRow):
raise TypeError('Bulk processing can not be applied for '
'conditional or append mutations.')

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

27 changes: 27 additions & 0 deletions bigtable/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,33 @@ def _write_to_row(self, row1=None, row2=None, row3=None, row4=None):
cell4 = Cell(CELL_VAL4, timestamp4)
return cell1, cell2, cell3, cell4

def test_mutate_rows(self):
row1 = self._table.row(ROW_KEY)
row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL1)
row1.commit()
self.rows_to_delete.append(row1)
row2 = self._table.row(ROW_KEY_ALT)
row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL2)
row2.commit()
self.rows_to_delete.append(row2)

# Change the contents
row1.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL3)
row2.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, CELL_VAL4)
rows = [row1, row2]
statuses = self._table.mutate_rows(rows)
result = [status.code for status in statuses]
expected_result = [0, 0]
self.assertEqual(result, expected_result)

# Check the contents
row1_data = self._table.read_row(ROW_KEY)
self.assertEqual(
row1_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL3)
row2_data = self._table.read_row(ROW_KEY_ALT)
self.assertEqual(
row2_data.cells[COLUMN_FAMILY_ID1][COL_NAME1][0].value, CELL_VAL4)

def test_read_large_cell_limit(self):
row = self._table.row(ROW_KEY)
self.rows_to_delete.append(row)
Expand Down
1 change: 1 addition & 0 deletions bigtable/tests/unit/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class TestRow(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigtable.row import Row

return Row

def _make_one(self, *args, **kwargs):
Expand Down
148 changes: 148 additions & 0 deletions bigtable/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,109 @@

import unittest

import mock


class Test___mutate_rows_request(unittest.TestCase):

def _call_fut(self, table_name, rows):
from google.cloud.bigtable.table import _mutate_rows_request

return _mutate_rows_request(table_name, rows)

@mock.patch('google.cloud.bigtable.table._MAX_BULK_MUTATIONS', new=3)
def test__mutate_rows_too_many_mutations(self):
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.table import TooManyMutationsError

table = mock.Mock(name='table', spec=['name'])
table.name = 'table'
rows = [DirectRow(row_key=b'row_key', table=table),
DirectRow(row_key=b'row_key_2', table=table)]
rows[0].set_cell('cf1', b'c1', 1)
rows[0].set_cell('cf1', b'c1', 2)
rows[1].set_cell('cf1', b'c1', 3)
rows[1].set_cell('cf1', b'c1', 4)
with self.assertRaises(TooManyMutationsError):
self._call_fut('table', rows)

def test__mutate_rows_request(self):
from google.cloud.bigtable.row import DirectRow

table = mock.Mock(name='table', spec=['name'])
table.name = 'table'
rows = [DirectRow(row_key=b'row_key', table=table),
DirectRow(row_key=b'row_key_2', table=table)]
rows[0].set_cell('cf1', b'c1', b'1')
rows[1].set_cell('cf1', b'c1', b'2')
result = self._call_fut('table', rows)

expected_result = _mutate_rows_request_pb(table_name='table')
entry1 = expected_result.entries.add()
entry1.row_key = b'row_key'
mutations1 = entry1.mutations.add()
mutations1.set_cell.family_name = 'cf1'
mutations1.set_cell.column_qualifier = b'c1'
mutations1.set_cell.timestamp_micros = -1
mutations1.set_cell.value = b'1'
entry2 = expected_result.entries.add()
entry2.row_key = b'row_key_2'
mutations2 = entry2.mutations.add()
mutations2.set_cell.family_name = 'cf1'
mutations2.set_cell.column_qualifier = b'c1'
mutations2.set_cell.timestamp_micros = -1
mutations2.set_cell.value = b'2'

self.assertEqual(result, expected_result)


class Test__check_row_table_name(unittest.TestCase):

def _call_fut(self, table_name, row):
from google.cloud.bigtable.table import _check_row_table_name

return _check_row_table_name(table_name, row)

def test_wrong_table_name(self):
from google.cloud.bigtable.table import TableMismatchError
from google.cloud.bigtable.row import DirectRow

table = mock.Mock(name='table', spec=['name'])
table.name = 'table'
row = DirectRow(row_key=b'row_key', table=table)
with self.assertRaises(TableMismatchError):
self._call_fut('other_table', row)

def test_right_table_name(self):
from google.cloud.bigtable.row import DirectRow

table = mock.Mock(name='table', spec=['name'])
table.name = 'table'
row = DirectRow(row_key=b'row_key', table=table)
result = self._call_fut('table', row)
self.assertFalse(result)


class Test__check_row_type(unittest.TestCase):
def _call_fut(self, row):
from google.cloud.bigtable.table import _check_row_type

return _check_row_type(row)

def test_test_wrong_row_type(self):
from google.cloud.bigtable.row import ConditionalRow

row = ConditionalRow(row_key=b'row_key', table='table', filter_=None)
with self.assertRaises(TypeError):
self._call_fut(row)

def test_right_row_type(self):
from google.cloud.bigtable.row import DirectRow

row = DirectRow(row_key=b'row_key', table='table')
result = self._call_fut(row)
self.assertFalse(result)


class TestTable(unittest.TestCase):

Expand Down Expand Up @@ -348,6 +451,44 @@ def test_read_row_still_partial(self):
with self.assertRaises(ValueError):
self._read_row_helper(chunks, None)

def test_mutate_rows(self):
from google.cloud.bigtable._generated.bigtable_pb2 import (
MutateRowsResponse)
from google.cloud.bigtable.row import DirectRow
from google.rpc.status_pb2 import Status
from tests.unit._testing import _FakeStub

client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
table = self._make_one(self.TABLE_ID, instance)

row_1 = DirectRow(row_key=b'row_key', table=table)
row_1.set_cell('cf', b'col', b'value1')
row_2 = DirectRow(row_key=b'row_key_2', table=table)
row_2.set_cell('cf', b'col', b'value2')

response = MutateRowsResponse(
entries=[
MutateRowsResponse.Entry(
index=0,
status=Status(code=0),
),
MutateRowsResponse.Entry(
index=1,
status=Status(code=1),
),
],
)

# Patch the stub used by the API method.
client._data_stub = _FakeStub([response])
statuses = table.mutate_rows([row_1, row_2])
result = [status.code for status in statuses]
expected_result = [0, 1]

self.assertEqual(result, expected_result)


def test_read_rows(self):
from google.cloud._testing import _Monkey
from tests.unit._testing import _FakeStub
Expand Down Expand Up @@ -570,6 +711,13 @@ def _SampleRowKeysRequestPB(*args, **kw):
return messages_v2_pb2.SampleRowKeysRequest(*args, **kw)


def _mutate_rows_request_pb(*args, **kw):
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)

return data_messages_v2_pb2.MutateRowsRequest(*args, **kw)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


def _TablePB(*args, **kw):
from google.cloud.bigtable._generated import (
table_pb2 as table_v2_pb2)
Expand Down