Skip to content

Commit

Permalink
RPC retries (second PR) (googleapis#3324)
Browse files Browse the repository at this point in the history
  • Loading branch information
calpeyser authored and lukesneeringer committed Jul 17, 2017
1 parent fe9b6cf commit 67f4ba4
Show file tree
Hide file tree
Showing 7 changed files with 520 additions and 81 deletions.
169 changes: 169 additions & 0 deletions bigtable/google/cloud/bigtable/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Provides function wrappers that implement retrying."""
import random
import time
import six
import sys

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)
from google.gax import config, errors
from grpc import RpcError


_MILLIS_PER_SECOND = 1000


class ReadRowsIterator(object):
"""Creates an iterator equivalent to a_iter, but that retries on certain
exceptions.
"""

def __init__(self, client, name, start_key, end_key, filter_, limit,
retry_options, **kwargs):
self.client = client
self.retry_options = retry_options
self.name = name
self.start_key = start_key
self.start_key_closed = True
self.end_key = end_key
self.filter_ = filter_
self.limit = limit
self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier
self.max_delay_millis = \
retry_options.backoff_settings.max_retry_delay_millis
self.timeout_mult = \
retry_options.backoff_settings.rpc_timeout_multiplier
self.max_timeout = \
(retry_options.backoff_settings.max_rpc_timeout_millis /
_MILLIS_PER_SECOND)
self.total_timeout = \
(retry_options.backoff_settings.total_timeout_millis /
_MILLIS_PER_SECOND)
self.set_stream()

def set_start_key(self, start_key):
"""
Sets the row key at which this iterator will begin reading.
"""
self.start_key = start_key
self.start_key_closed = False

def set_stream(self):
"""
Resets the read stream by making an RPC on the 'ReadRows' endpoint.
"""
req_pb = _create_row_request(self.name, start_key=self.start_key,
start_key_closed=self.start_key_closed,
end_key=self.end_key,
filter_=self.filter_, limit=self.limit)
self.stream = self.client._data_stub.ReadRows(req_pb)

def next(self, *args, **kwargs):
"""
Read and return the next row from the stream.
Retry on idempotent failure.
"""
delay = self.retry_options.backoff_settings.initial_retry_delay_millis
exc = errors.RetryError('Retry total timeout exceeded before any'
'response was received')
timeout = (self.retry_options.backoff_settings
.initial_rpc_timeout_millis /
_MILLIS_PER_SECOND)

now = time.time()
deadline = now + self.total_timeout
while deadline is None or now < deadline:
try:
return six.next(self.stream)
except StopIteration as stop:
raise stop
except RpcError as error: # pylint: disable=broad-except
code = config.exc_to_code(error)
if code not in self.retry_options.retry_codes:
six.reraise(type(error), error)

# pylint: disable=redefined-variable-type
exc = errors.RetryError(
'Retry total timeout exceeded with exception', error)

# Sleep a random number which will, on average, equal the
# expected delay.
to_sleep = random.uniform(0, delay * 2)
time.sleep(to_sleep / _MILLIS_PER_SECOND)
delay = min(delay * self.delay_mult, self.max_delay_millis)
now = time.time()
timeout = min(
timeout * self.timeout_mult, self.max_timeout,
deadline - now)
self.set_stream()

six.reraise(errors.RetryError, exc, sys.exc_info()[2])

def __next__(self, *args, **kwargs):
return self.next(*args, **kwargs)


def _create_row_request(table_name, row_key=None, start_key=None,
start_key_closed=True, end_key=None, filter_=None,
limit=None):
"""Creates a request to read rows in a table.
:type table_name: str
:param table_name: The name of the table to read from.
:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results.
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
if start_key_closed:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
else:
range_kwargs['start_key_open'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key_open'] = _to_bytes(end_key)
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if limit is not None:
request_kwargs['rows_limit'] = limit

message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)

if row_key is not None:
message.rows.row_keys.append(_to_bytes(row_key))

if range_kwargs:
message.rows.row_ranges.add(**range_kwargs)

return message
3 changes: 3 additions & 0 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ def consume_next(self):

self._validate_chunk(chunk)

if hasattr(self._response_iterator, 'set_start_key'):
self._response_iterator.set_start_key(chunk.row_key)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
Expand Down
101 changes: 32 additions & 69 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import six

from google.cloud._helpers import _to_bytes
from google.cloud.bigtable._generated import (
bigtable_pb2 as data_messages_v2_pb2)
from google.cloud.bigtable._generated import (
Expand All @@ -30,6 +29,26 @@
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.gax import RetryOptions, BackoffSettings
from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request
from grpc import StatusCode

BACKOFF_SETTINGS = BackoffSettings(
initial_retry_delay_millis=10,
retry_delay_multiplier=1.3,
max_retry_delay_millis=30000,
initial_rpc_timeout_millis=25 * 60 * 1000,
rpc_timeout_multiplier=1.0,
max_rpc_timeout_millis=25 * 60 * 1000,
total_timeout_millis=30 * 60 * 1000
)

RETRY_CODES = [
StatusCode.DEADLINE_EXCEEDED,
StatusCode.ABORTED,
StatusCode.INTERNAL,
StatusCode.UNAVAILABLE
]


# Maximum number of mutations in bulk (MutateRowsRequest message):
Expand Down Expand Up @@ -257,7 +276,7 @@ def read_row(self, row_key, filter_=None):
return rows_data.rows[row_key]

def read_rows(self, start_key=None, end_key=None, limit=None,
filter_=None):
filter_=None, backoff_settings=None):
"""Read rows from this table.
:type start_key: bytes
Expand All @@ -284,13 +303,18 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
"""
request_pb = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit)
client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)
if backoff_settings is None:
backoff_settings = BACKOFF_SETTINGS
RETRY_OPTIONS = RetryOptions(
retry_codes=RETRY_CODES,
backoff_settings=backoff_settings
)

retrying_iterator = ReadRowsIterator(client, self.name, start_key,
end_key, filter_, limit,
RETRY_OPTIONS)
return PartialRowsData(retrying_iterator)

def mutate_rows(self, rows):
"""Mutates multiple rows in bulk.
Expand Down Expand Up @@ -359,67 +383,6 @@ def sample_row_keys(self):
return response_iterator


def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
filter_=None, limit=None):
"""Creates a request to read rows in a table.
:type table_name: str
:param table_name: The name of the table to read from.
:type row_key: bytes
:param row_key: (Optional) The key of a specific row to read from.
:type start_key: bytes
:param start_key: (Optional) The beginning of a range of row keys to
read from. The range will include ``start_key``. If
left empty, will be interpreted as the empty string.
:type end_key: bytes
:param end_key: (Optional) The end of a range of row keys to read from.
The range will not include ``end_key``. If left empty,
will be interpreted as an infinite string.
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads the entire table.
:type limit: int
:param limit: (Optional) The read will terminate after committing to N
rows' worth of results. The default (zero) is to return
all results.
:rtype: :class:`data_messages_v2_pb2.ReadRowsRequest`
:returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs.
:raises: :class:`ValueError <exceptions.ValueError>` if both
``row_key`` and one of ``start_key`` and ``end_key`` are set
"""
request_kwargs = {'table_name': table_name}
if (row_key is not None and
(start_key is not None or end_key is not None)):
raise ValueError('Row key and row range cannot be '
'set simultaneously')
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
if end_key is not None:
range_kwargs['end_key_open'] = _to_bytes(end_key)
if filter_ is not None:
request_kwargs['filter'] = filter_.to_pb()
if limit is not None:
request_kwargs['rows_limit'] = limit

message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs)

if row_key is not None:
message.rows.row_keys.append(_to_bytes(row_key))

if range_kwargs:
message.rows.row_ranges.add(**range_kwargs)

return message


def _mutate_rows_request(table_name, rows):
"""Creates a request to mutate rows in a table.
Expand Down
38 changes: 38 additions & 0 deletions bigtable/tests/retry_test_script.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# This retry script is processed by the retry server and the client under test.
# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC
# to the retry server and expect a valid response.
# "EXPECT" commands indicate the call the server is expecting the client to send.
#
# The retry server has one table named "table" that should be used for testing.
# There are three types of commands supported:
# READ <comma-separated list of row ids to read>
# Expect the corresponding rows to be returned with arbitrary values.
# SCAN <range>... <comma separated list of row ids to expect>
# Ranges are expressed as an interval with either open or closed start and end,
# such as [1,3) for "1,2" or (1, 3] for "2,3".
# WRITE <comma-separated list of row ids to write>
# All writes should succeed eventually. Value payload is ignored.
# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test.
# All other server output should be ignored.

# Echo same scan back after immediate error
CLIENT: SCAN [r1,r3) r1,r2
EXPECT: SCAN [r1,r3)
SERVER: ERROR Unavailable
EXPECT: SCAN [r1,r3)
SERVER: READ_RESPONSE r1,r2

# Retry scans with open interval starting at the least read row key.
# Instead of using open intervals for retry ranges, '\x00' can be
# appended to the last received row key and sent in a closed interval.
CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8
EXPECT: SCAN [r1,r9)
SERVER: READ_RESPONSE r1,r2,r3,r4
SERVER: ERROR Unavailable
EXPECT: SCAN (r4,r9)
SERVER: ERROR Unavailable
EXPECT: SCAN (r4,r9)
SERVER: READ_RESPONSE r5,r6,r7
SERVER: ERROR Unavailable
EXPECT: SCAN (r7,r9)
SERVER: READ_RESPONSE r8
Loading

0 comments on commit 67f4ba4

Please sign in to comment.