-
Notifications
You must be signed in to change notification settings - Fork 101
fix: update retry strategy for mutation calls to handle aborted transactions #1279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
01a0196
2a9b805
c634bdb
a6e25a3
198c7df
1032b8b
d4e7d9c
fa8ae71
d5c4975
7f3088c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,11 +27,15 @@ | |
| from google.protobuf.internal.enum_type_wrapper import EnumTypeWrapper | ||
|
|
||
| from google.api_core import datetime_helpers | ||
| from google.api_core.exceptions import Aborted | ||
| from google.cloud._helpers import _date_from_iso8601_date | ||
| from google.cloud.spanner_v1 import TypeCode | ||
| from google.cloud.spanner_v1 import ExecuteSqlRequest | ||
| from google.cloud.spanner_v1 import JsonObject | ||
| from google.cloud.spanner_v1.request_id_header import with_request_id | ||
| from google.rpc.error_details_pb2 import RetryInfo | ||
|
|
||
| import random | ||
|
|
||
| # Validation error messages | ||
| NUMERIC_MAX_SCALE_ERR_MSG = ( | ||
|
|
@@ -460,6 +464,34 @@ def _metadata_with_prefix(prefix, **kw): | |
| return [("google-cloud-resource-prefix", prefix)] | ||
|
|
||
|
|
||
| def _retry_on_aborted_exception( | ||
| func, | ||
| deadline, | ||
| allowed_exceptions=None, | ||
| ): | ||
| """ | ||
| Handles retry logic for Aborted exceptions, considering the deadline. | ||
| Retries the function in case of Aborted exceptions and other allowed exceptions. | ||
| """ | ||
| attempts = 0 | ||
| while True: | ||
| try: | ||
| attempts += 1 | ||
| return func() | ||
| except Aborted as exc: | ||
| _delay_until_retry(exc, deadline=deadline, attempts=attempts) | ||
| continue | ||
| except Exception as exc: | ||
| try: | ||
| retry_result = _retry(func=func, allowed_exceptions=allowed_exceptions) | ||
| if retry_result is not None: | ||
| return retry_result | ||
| else: | ||
| raise exc | ||
| except Aborted: | ||
| continue | ||
|
||
|
|
||
|
|
||
| def _retry( | ||
| func, | ||
| retry_count=5, | ||
|
|
@@ -473,6 +505,7 @@ def _retry( | |
| Args: | ||
| func: The function to be retried. | ||
| retry_count: The maximum number of times to retry the function. | ||
| deadline: This will be used in case of Aborted transactions. | ||
|
||
| delay: The delay in seconds between retries. | ||
| allowed_exceptions: A tuple of exceptions that are allowed to occur without triggering a retry. | ||
| Passing allowed_exceptions as None will lead to retrying for all exceptions. | ||
|
|
@@ -529,6 +562,60 @@ def _metadata_with_leader_aware_routing(value, **kw): | |
| return ("x-goog-spanner-route-to-leader", str(value).lower()) | ||
|
|
||
|
|
||
| def _delay_until_retry(exc, deadline, attempts): | ||
| """Helper for :meth:`Session.run_in_transaction`. | ||
|
|
||
| Detect retryable abort, and impose server-supplied delay. | ||
|
|
||
| :type exc: :class:`google.api_core.exceptions.Aborted` | ||
| :param exc: exception for aborted transaction | ||
|
|
||
| :type deadline: float | ||
| :param deadline: maximum timestamp to continue retrying the transaction. | ||
|
|
||
| :type attempts: int | ||
| :param attempts: number of call retries | ||
| """ | ||
|
|
||
| cause = exc.errors[0] | ||
| now = time.time() | ||
| if now >= deadline: | ||
| raise | ||
|
|
||
| delay = _get_retry_delay(cause, attempts) | ||
| if delay is not None: | ||
| if now + delay > deadline: | ||
| raise | ||
|
|
||
| time.sleep(delay) | ||
|
|
||
|
|
||
| def _get_retry_delay(cause, attempts): | ||
| """Helper for :func:`_delay_until_retry`. | ||
|
|
||
| :type exc: :class:`grpc.Call` | ||
| :param exc: exception for aborted transaction | ||
|
|
||
| :rtype: float | ||
| :returns: seconds to wait before retrying the transaction. | ||
|
|
||
| :type attempts: int | ||
| :param attempts: number of call retries | ||
| """ | ||
| if hasattr(cause, "trailing_metadata"): | ||
| metadata = dict(cause.trailing_metadata()) | ||
| else: | ||
| metadata = {} | ||
| retry_info_pb = metadata.get("google.rpc.retryinfo-bin") | ||
| if retry_info_pb is not None: | ||
| retry_info = RetryInfo() | ||
| retry_info.ParseFromString(retry_info_pb) | ||
| nanos = retry_info.retry_delay.nanos | ||
| return retry_info.retry_delay.seconds + nanos / 1.0e9 | ||
|
|
||
| return 2**attempts + random.random() | ||
|
|
||
|
|
||
| class AtomicCounter: | ||
| def __init__(self, start_value=0): | ||
| self.__lock = threading.Lock() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,8 +29,12 @@ | |
| from google.cloud.spanner_v1._opentelemetry_tracing import trace_call | ||
| from google.cloud.spanner_v1 import RequestOptions | ||
| from google.cloud.spanner_v1._helpers import _retry | ||
| from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception | ||
| from google.cloud.spanner_v1._helpers import _check_rst_stream_error | ||
| from google.api_core.exceptions import InternalServerError | ||
| import time | ||
|
|
||
| DEFAULT_RETRY_TIMEOUT_SECS = 30 | ||
|
|
||
|
|
||
| class _BatchBase(_SessionWrapper): | ||
|
|
@@ -162,6 +166,7 @@ def commit( | |
| request_options=None, | ||
| max_commit_delay=None, | ||
| exclude_txn_from_change_streams=False, | ||
| **kwargs, | ||
| ): | ||
| """Commit mutations to the database. | ||
|
|
||
|
|
@@ -227,9 +232,15 @@ def commit( | |
| request=request, | ||
| metadata=metadata, | ||
| ) | ||
| response = _retry( | ||
| deadline = time.time() + kwargs.get( | ||
| "timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS | ||
| ) | ||
| response = _retry_on_aborted_exception( | ||
| method, | ||
| allowed_exceptions={InternalServerError: _check_rst_stream_error}, | ||
| allowed_exceptions={ | ||
| InternalServerError: _check_rst_stream_error, | ||
| }, | ||
| deadline=deadline, | ||
| ) | ||
| self.committed = response.commit_timestamp | ||
| self.commit_stats = response.commit_stats | ||
|
|
@@ -293,7 +304,9 @@ def group(self): | |
| self._mutation_groups.append(mutation_group) | ||
| return MutationGroup(self._session, mutation_group.mutations) | ||
|
|
||
| def batch_write(self, request_options=None, exclude_txn_from_change_streams=False): | ||
| def batch_write( | ||
|
||
| self, request_options=None, exclude_txn_from_change_streams=False, **kwargs | ||
| ): | ||
| """Executes batch_write. | ||
|
|
||
| :type request_options: | ||
|
|
@@ -348,7 +361,9 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals | |
| ) | ||
| response = _retry( | ||
| method, | ||
| allowed_exceptions={InternalServerError: _check_rst_stream_error}, | ||
| allowed_exceptions={ | ||
| InternalServerError: _check_rst_stream_error, | ||
| }, | ||
| ) | ||
| self.committed = True | ||
| return response | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can simplify this further and just remove
allowed_exceptionsfrom this function. It should only retry aborted exceptions.