Skip to content

Commit 166278f

Browse files
aneepcttseaver
authored andcommitted
BigTable: Adding a row generator on a table. (#4679)
Allows iteration over the rows in a table instead of reading the rows into an internal dictionary first. As soon as a row has been validated, it is available in the iterator.
1 parent 7714d0c commit 166278f

File tree

4 files changed

+289
-178
lines changed

4 files changed

+289
-178
lines changed

bigtable/google/cloud/bigtable/row_data.py

Lines changed: 101 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,64 @@ class PartialRowsData(object):
191191
:param response_iterator: A streaming iterator returned from a
192192
``ReadRows`` request.
193193
"""
194-
START = "Start" # No responses yet processed.
195-
NEW_ROW = "New row" # No cells yet complete for row
196-
ROW_IN_PROGRESS = "Row in progress" # Some cells complete for row
197-
CELL_IN_PROGRESS = "Cell in progress" # Incomplete cell for row
194+
195+
START = 'Start' # No responses yet processed.
196+
NEW_ROW = 'New row' # No cells yet complete for row
197+
ROW_IN_PROGRESS = 'Row in progress' # Some cells complete for row
198+
CELL_IN_PROGRESS = 'Cell in progress' # Incomplete cell for row
198199

199200
def __init__(self, response_iterator):
200201
self._response_iterator = response_iterator
202+
self._generator = YieldRowsData(response_iterator)
203+
201204
# Fully-processed rows, keyed by `row_key`
202-
self._rows = {}
205+
self.rows = {}
206+
207+
def __eq__(self, other):
208+
if not isinstance(other, self.__class__):
209+
return NotImplemented
210+
return other._response_iterator == self._response_iterator
211+
212+
def __ne__(self, other):
213+
return not self == other
214+
215+
@property
216+
def state(self):
217+
"""State machine state.
218+
219+
:rtype: str
220+
:returns: name of state corresponding to currrent row / chunk
221+
processing.
222+
"""
223+
return self._generator.state
224+
225+
def consume_all(self, max_loops=None):
226+
"""Consume the streamed responses until there are no more.
227+
228+
:type max_loops: int
229+
:param max_loops: (Optional) Maximum number of times to try to consume
230+
an additional ``ReadRowsResponse``. You can use this
231+
to avoid long wait times.
232+
"""
233+
for row in self._generator.read_rows():
234+
self.rows[row.row_key] = row
235+
236+
237+
class YieldRowsData(object):
238+
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.
239+
240+
:type response_iterator: :class:`~google.cloud.exceptions.GrpcRendezvous`
241+
:param response_iterator: A streaming iterator returned from a
242+
``ReadRows`` request.
243+
"""
244+
245+
START = 'Start' # No responses yet processed.
246+
NEW_ROW = 'New row' # No cells yet complete for row
247+
ROW_IN_PROGRESS = 'Row in progress' # Some cells complete for row
248+
CELL_IN_PROGRESS = 'Cell in progress' # Incomplete cell for row
249+
250+
def __init__(self, response_iterator):
251+
self._response_iterator = response_iterator
203252
# Counter for responses pulled from iterator
204253
self._counter = 0
205254
# Maybe cached from previous response
@@ -213,14 +262,6 @@ def __init__(self, response_iterator):
213262
# Last complete cell, unset until first completion, after new row
214263
self._previous_cell = None
215264

216-
def __eq__(self, other):
217-
if not isinstance(other, self.__class__):
218-
return NotImplemented
219-
return other._response_iterator == self._response_iterator
220-
221-
def __ne__(self, other):
222-
return not self == other
223-
224265
@property
225266
def state(self):
226267
"""State machine state.
@@ -241,96 +282,75 @@ def state(self):
241282
return self.ROW_IN_PROGRESS
242283
return self.NEW_ROW # row added, no chunk yet processed
243284

244-
@property
245-
def rows(self):
246-
"""Property returning all rows accumulated from the stream.
247-
248-
:rtype: dict
249-
:returns: row_key -> :class:`PartialRowData`.
250-
"""
251-
# NOTE: To avoid duplicating large objects, this is just the
252-
# mutable private data.
253-
return self._rows
254-
255285
def cancel(self):
256286
"""Cancels the iterator, closing the stream."""
257287
self._response_iterator.cancel()
258288

259-
def consume_next(self):
260-
"""Consume the next ``ReadRowsResponse`` from the stream.
289+
def read_rows(self):
290+
"""Consume the ``ReadRowsResponse's`` from the stream.
291+
Read the rows and yield each to the reader
261292
262293
Parse the response and its chunks into a new/existing row in
263294
:attr:`_rows`. Rows are returned in order by row key.
264295
"""
265-
response = six.next(self._response_iterator)
266-
self._counter += 1
296+
while True:
297+
try:
298+
response = six.next(self._response_iterator)
299+
except StopIteration:
300+
break
267301

268-
if self._last_scanned_row_key is None: # first response
269-
if response.last_scanned_row_key:
270-
raise InvalidReadRowsResponse()
302+
self._counter += 1
271303

272-
self._last_scanned_row_key = response.last_scanned_row_key
304+
if self._last_scanned_row_key is None: # first response
305+
if response.last_scanned_row_key:
306+
raise InvalidReadRowsResponse()
273307

274-
row = self._row
275-
cell = self._cell
308+
self._last_scanned_row_key = response.last_scanned_row_key
276309

277-
for chunk in response.chunks:
310+
row = self._row
311+
cell = self._cell
278312

279-
self._validate_chunk(chunk)
313+
for chunk in response.chunks:
280314

281-
if chunk.reset_row:
282-
row = self._row = None
283-
cell = self._cell = self._previous_cell = None
284-
continue
315+
self._validate_chunk(chunk)
285316

286-
if row is None:
287-
row = self._row = PartialRowData(chunk.row_key)
317+
if chunk.reset_row:
318+
row = self._row = None
319+
cell = self._cell = self._previous_cell = None
320+
continue
288321

289-
if cell is None:
290-
qualifier = None
291-
if chunk.HasField('qualifier'):
292-
qualifier = chunk.qualifier.value
322+
if row is None:
323+
row = self._row = PartialRowData(chunk.row_key)
293324

294-
cell = self._cell = PartialCellData(
295-
chunk.row_key,
296-
chunk.family_name.value,
297-
qualifier,
298-
chunk.timestamp_micros,
299-
chunk.labels,
300-
chunk.value)
301-
self._copy_from_previous(cell)
302-
else:
303-
cell.append_value(chunk.value)
325+
if cell is None:
326+
qualifier = None
327+
if chunk.HasField('qualifier'):
328+
qualifier = chunk.qualifier.value
304329

305-
if chunk.commit_row:
306-
self._save_current_row()
307-
row = cell = None
308-
continue
330+
cell = self._cell = PartialCellData(
331+
chunk.row_key,
332+
chunk.family_name.value,
333+
qualifier,
334+
chunk.timestamp_micros,
335+
chunk.labels,
336+
chunk.value)
337+
self._copy_from_previous(cell)
338+
else:
339+
cell.append_value(chunk.value)
309340

310-
if chunk.value_size == 0:
311-
self._save_current_cell()
312-
cell = None
341+
if chunk.commit_row:
342+
self._save_current_cell()
313343

314-
def consume_all(self, max_loops=None):
315-
"""Consume the streamed responses until there are no more.
344+
yield self._row
316345

317-
This simply calls :meth:`consume_next` until there are no
318-
more to consume.
346+
self._row, self._previous_row = None, self._row
347+
self._previous_cell = None
348+
row = cell = None
349+
continue
319350

320-
:type max_loops: int
321-
:param max_loops: (Optional) Maximum number of times to try to consume
322-
an additional ``ReadRowsResponse``. You can use this
323-
to avoid long wait times.
324-
"""
325-
curr_loop = 0
326-
if max_loops is None:
327-
max_loops = float('inf')
328-
while curr_loop < max_loops:
329-
curr_loop += 1
330-
try:
331-
self.consume_next()
332-
except StopIteration:
333-
break
351+
if chunk.value_size == 0:
352+
self._save_current_cell()
353+
cell = None
334354

335355
@staticmethod
336356
def _validate_chunk_status(chunk):
@@ -433,14 +453,6 @@ def _copy_from_previous(self, cell):
433453
if cell.qualifier is None:
434454
cell.qualifier = previous.qualifier
435455

436-
def _save_current_row(self):
437-
"""Helper for :meth:`consume_next`."""
438-
if self._cell:
439-
self._save_current_cell()
440-
self._rows[self._row.row_key] = self._row
441-
self._row, self._previous_row = None, self._row
442-
self._previous_cell = None
443-
444456

445457
def _raise_if(predicate, *args):
446458
"""Helper for validation methods."""

bigtable/google/cloud/bigtable/table.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from google.cloud.bigtable.row import ConditionalRow
3232
from google.cloud.bigtable.row import DirectRow
3333
from google.cloud.bigtable.row_data import PartialRowsData
34+
from google.cloud.bigtable.row_data import YieldRowsData
3435
from grpc import StatusCode
3536

3637

@@ -315,6 +316,43 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
315316
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
316317
return PartialRowsData(response_iterator)
317318

319+
def yield_rows(self, start_key=None, end_key=None, limit=None,
320+
filter_=None):
321+
"""Read rows from this table.
322+
323+
:type start_key: bytes
324+
:param start_key: (Optional) The beginning of a range of row keys to
325+
read from. The range will include ``start_key``. If
326+
left empty, will be interpreted as the empty string.
327+
328+
:type end_key: bytes
329+
:param end_key: (Optional) The end of a range of row keys to read from.
330+
The range will not include ``end_key``. If left empty,
331+
will be interpreted as an infinite string.
332+
333+
:type limit: int
334+
:param limit: (Optional) The read will terminate after committing to N
335+
rows' worth of results. The default (zero) is to return
336+
all results.
337+
338+
:type filter_: :class:`.RowFilter`
339+
:param filter_: (Optional) The filter to apply to the contents of the
340+
specified row(s). If unset, reads every column in
341+
each row.
342+
343+
:rtype: :class:`.PartialRowData`
344+
:returns: A :class:`.PartialRowData` for each row returned
345+
"""
346+
request_pb = _create_row_request(
347+
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
348+
limit=limit)
349+
client = self._instance._client
350+
response_iterator = client._data_stub.ReadRows(request_pb)
351+
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
352+
generator = YieldRowsData(response_iterator)
353+
for row in generator.read_rows():
354+
yield row
355+
318356
def mutate_rows(self, rows, retry=DEFAULT_RETRY):
319357
"""Mutates multiple rows in bulk.
320358

0 commit comments

Comments
 (0)