Skip to content

Commit

Permalink
Merge pull request #1495 from dhermes/bigtable-read-rows
Browse files Browse the repository at this point in the history
Adding Bigtable Table.read_row().
  • Loading branch information
dhermes committed Feb 19, 2016
2 parents bc718d3 + 123eca0 commit 8749101
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 0 deletions.
114 changes: 114 additions & 0 deletions gcloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
"""User friendly container for Google Cloud Bigtable Table."""


from gcloud._helpers import _to_bytes
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_table_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as data_messages_pb2)
from gcloud.bigtable.column_family import _gc_rule_from_pb
from gcloud.bigtable.column_family import ColumnFamily
from gcloud.bigtable.row import Row
from gcloud.bigtable.row_data import PartialRowData


class Table(object):
Expand Down Expand Up @@ -218,6 +221,40 @@ def list_column_families(self):
result[column_family_id] = column_family
return result

def read_row(self, row_key, filter_=None):
"""Read a single row from this table.
:type row_key: bytes
:param row_key: The key of the row to read from.
:type filter_: :class:`.row.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
row. If unset, returns the entire row.
:rtype: :class:`.PartialRowData`, :data:`NoneType <types.NoneType>`
:returns: The contents of the row if any chunks were returned in
the response, otherwise :data:`None`.
:raises: :class:`ValueError <exceptions.ValueError>` if a commit row
chunk is never encountered.
"""
request_pb = _create_row_request(self.name, row_key=row_key,
filter_=filter_)
client = self._cluster._client
response_iterator = client._data_stub.ReadRows(request_pb,
client.timeout_seconds)
# We expect an iterator of `data_messages_pb2.ReadRowsResponse`
result = PartialRowData(row_key)
for read_rows_response in response_iterator:
result.update_from_read_rows(read_rows_response)

# Make sure the result actually contains data.
if not result._chunks_encountered:
return None
# Make sure the result was committed by the back-end.
if not result.committed:
raise ValueError('The row remains partial / is not committed.')
return result

def sample_row_keys(self):
"""Read a sample of row keys in the table.
Expand Down Expand Up @@ -255,3 +292,80 @@ def sample_row_keys(self):
response_iterator = client._data_stub.SampleRowKeys(
request_pb, client.timeout_seconds)
return response_iterator


def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
filter_=None, allow_row_interleaving=None, limit=None):
"""Creates a request to read rows in a table.
:type table_name: str
:param table_name: The name of the table to read from.
:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`,
:class:`.row.RowFilterUnion` or
:class:`.row.ConditionalRowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
:type allow_row_interleaving: bool
:param allow_row_interleaving: (Optional) By default, rows are read
sequentially, producing results which are
guaranteed to arrive in increasing row
order. Setting
``allow_row_interleaving`` to
:data:`True` allows multiple rows to be
interleaved in the response stream,
which increases throughput but breaks
this guarantee, and may force the
client to use more memory to buffer
partially-received rows.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results. Note that if ``allow_row_interleaving`` is
set to :data:`True`, partial results may be returned for
more than N rows. However, only N ``commit_row`` chunks
will be sent.
:rtype: :class:`data_messages_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
if row_key is not None:
request_kwargs['row_key'] = _to_bytes(row_key)
if start_key is not None or end_key is not None:
range_kwargs = {}
if start_key is not None:
range_kwargs['start_key'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key'] = _to_bytes(end_key)
row_range = data_pb2.RowRange(**range_kwargs)
request_kwargs['row_range'] = row_range
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if allow_row_interleaving is not None:
request_kwargs['allow_row_interleaving'] = allow_row_interleaving
if limit is not None:
request_kwargs['num_rows_limit'] = limit

return data_messages_pb2.ReadRowsRequest(**request_kwargs)
201 changes: 201 additions & 0 deletions gcloud/bigtable/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,85 @@ def test_delete(self):
{},
)])

def _read_row_helper(self, chunks):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable.row_data import PartialRowData
from gcloud.bigtable import table as MUT

project_id = 'project-id'
zone = 'zone'
cluster_id = 'cluster-id'
table_id = 'table-id'
timeout_seconds = 596
client = _Client(timeout_seconds=timeout_seconds)
cluster_name = ('projects/' + project_id + '/zones/' + zone +
'/clusters/' + cluster_id)
cluster = _Cluster(cluster_name, client=client)
table = self._makeOne(table_id, cluster)

# Create request_pb
request_pb = object() # Returned by our mock.
mock_created = []

def mock_create_row_request(table_name, row_key, filter_):
mock_created.append((table_name, row_key, filter_))
return request_pb

# Create response_iterator
row_key = b'row-key'
response_pb = messages_pb2.ReadRowsResponse(row_key=row_key,
chunks=chunks)
response_iterator = [response_pb]

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_iterator)

# Create expected_result.
if chunks:
expected_result = PartialRowData(row_key)
expected_result._committed = True
expected_result._chunks_encountered = True
else:
expected_result = None

# Perform the method and check the result.
filter_obj = object()
with _Monkey(MUT, _create_row_request=mock_create_row_request):
result = table.read_row(row_key, filter_=filter_obj)

self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'ReadRows',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(mock_created, [(table.name, row_key, filter_obj)])

def test_read_row(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)
chunks = [chunk]
self._read_row_helper(chunks)

def test_read_empty_row(self):
chunks = []
self._read_row_helper(chunks)

def test_read_row_still_partial(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

# There is never a "commit row".
chunk = messages_pb2.ReadRowsResponse.Chunk(reset_row=True)
chunks = [chunk]
with self.assertRaises(ValueError):
self._read_row_helper(chunks)

def test_sample_row_keys(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
Expand Down Expand Up @@ -331,6 +410,128 @@ def test_sample_row_keys(self):
)])


class Test__create_row_request(unittest2.TestCase):

def _callFUT(self, table_name, row_key=None, start_key=None, end_key=None,
filter_=None, allow_row_interleaving=None, limit=None):
from gcloud.bigtable.table import _create_row_request
return _create_row_request(
table_name, row_key=row_key, start_key=start_key, end_key=end_key,
filter_=filter_, allow_row_interleaving=allow_row_interleaving,
limit=limit)

def test_table_name_only(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
result = self._callFUT(table_name)
expected_result = messages_pb2.ReadRowsRequest(table_name=table_name)
self.assertEqual(result, expected_result)

def test_row_key_row_range_conflict(self):
with self.assertRaises(ValueError):
self._callFUT(None, row_key=object(), end_key=object())

def test_row_key(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
row_key = b'row_key'
result = self._callFUT(table_name, row_key=row_key)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
row_key=row_key,
)
self.assertEqual(result, expected_result)

def test_row_range_start_key(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
start_key = b'start_key'
result = self._callFUT(table_name, start_key=start_key)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
row_range=data_pb2.RowRange(start_key=start_key),
)
self.assertEqual(result, expected_result)

def test_row_range_end_key(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
end_key = b'end_key'
result = self._callFUT(table_name, end_key=end_key)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
row_range=data_pb2.RowRange(end_key=end_key),
)
self.assertEqual(result, expected_result)

def test_row_range_both_keys(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
start_key = b'start_key'
end_key = b'end_key'
result = self._callFUT(table_name, start_key=start_key,
end_key=end_key)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
row_range=data_pb2.RowRange(start_key=start_key, end_key=end_key),
)
self.assertEqual(result, expected_result)

def test_with_filter(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable.row import RowSampleFilter

table_name = 'table_name'
row_filter = RowSampleFilter(0.33)
result = self._callFUT(table_name, filter_=row_filter)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
filter=row_filter.to_pb(),
)
self.assertEqual(result, expected_result)

def test_with_allow_row_interleaving(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
allow_row_interleaving = True
result = self._callFUT(table_name,
allow_row_interleaving=allow_row_interleaving)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
allow_row_interleaving=allow_row_interleaving,
)
self.assertEqual(result, expected_result)

def test_with_limit(self):
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)

table_name = 'table_name'
limit = 1337
result = self._callFUT(table_name, limit=limit)
expected_result = messages_pb2.ReadRowsRequest(
table_name=table_name,
num_rows_limit=limit,
)
self.assertEqual(result, expected_result)


class _Client(object):

data_stub = None
Expand Down

0 comments on commit 8749101

Please sign in to comment.