Skip to content

Extract check retriable error #99

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions ydb/_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dataclasses import dataclass
from typing import Optional

from ydb import issues

_errors_retriable_fast_backoff_types = [
issues.Unavailable,
]
_errors_retriable_slow_backoff_types = [
issues.Aborted,
issues.BadSession,
issues.Overloaded,
issues.SessionPoolEmpty,
issues.ConnectionError,
]
_errors_retriable_slow_backoff_idempotent_types = [
issues.Undetermined,
]


def check_retriable_error(err, retry_settings, attempt):
if isinstance(err, issues.NotFound):
if retry_settings.retry_not_found:
return ErrorRetryInfo(
True, retry_settings.fast_backoff.calc_timeout(attempt)
)
else:
return ErrorRetryInfo(False, None)

if isinstance(err, issues.InternalError):
if retry_settings.retry_internal_error:
return ErrorRetryInfo(
True, retry_settings.slow_backoff.calc_timeout(attempt)
)
else:
return ErrorRetryInfo(False, None)

for t in _errors_retriable_fast_backoff_types:
if isinstance(err, t):
return ErrorRetryInfo(
True, retry_settings.fast_backoff.calc_timeout(attempt)
)

for t in _errors_retriable_slow_backoff_types:
if isinstance(err, t):
return ErrorRetryInfo(
True, retry_settings.slow_backoff.calc_timeout(attempt)
)

if retry_settings.idempotent:
for t in _errors_retriable_slow_backoff_idempotent_types:
return ErrorRetryInfo(
True, retry_settings.slow_backoff.calc_timeout(attempt)
)

return ErrorRetryInfo(False, None)


@dataclass
class ErrorRetryInfo:
is_retriable: bool
sleep_timeout_seconds: Optional[float]
77 changes: 33 additions & 44 deletions ydb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
_tx_ctx_impl,
tracing,
)
from ._errors import check_retriable_error

try:
from . import interceptor
Expand Down Expand Up @@ -916,12 +917,28 @@ class YdbRetryOperationSleepOpt(object):
def __init__(self, timeout):
self.timeout = timeout

def __eq__(self, other):
return type(self) == type(other) and self.timeout == other.timeout

def __repr__(self):
return "YdbRetryOperationSleepOpt(%s)" % self.timeout


class YdbRetryOperationFinalResult(object):
def __init__(self, result):
self.result = result
self.exc = None

def __eq__(self, other):
return (
type(self) == type(other)
and self.result == other.result
and self.exc == other.exc
)

def __repr__(self):
return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc)

def set_exception(self, exc):
self.exc = exc

Expand All @@ -938,56 +955,28 @@ def retry_operation_impl(callee, retry_settings=None, *args, **kwargs):
if result.exc is not None:
raise result.exc

except (
issues.Aborted,
issues.BadSession,
issues.NotFound,
issues.InternalError,
) as e:
status = e
retry_settings.on_ydb_error_callback(e)

if isinstance(e, issues.NotFound) and not retry_settings.retry_not_found:
raise e

if (
isinstance(e, issues.InternalError)
and not retry_settings.retry_internal_error
):
raise e

except (
issues.Overloaded,
issues.SessionPoolEmpty,
issues.ConnectionError,
) as e:
status = e
retry_settings.on_ydb_error_callback(e)
yield YdbRetryOperationSleepOpt(
retry_settings.slow_backoff.calc_timeout(attempt)
)

except issues.Unavailable as e:
except issues.Error as e:
status = e
retry_settings.on_ydb_error_callback(e)
yield YdbRetryOperationSleepOpt(
retry_settings.fast_backoff.calc_timeout(attempt)
)

except issues.Undetermined as e:
status = e
retry_settings.on_ydb_error_callback(e)
if not retry_settings.idempotent:
# operation is not idempotent, so we cannot retry.
retriable_info = check_retriable_error(e, retry_settings, attempt)
if not retriable_info.is_retriable:
raise

yield YdbRetryOperationSleepOpt(
retry_settings.fast_backoff.calc_timeout(attempt)
)
skip_yield_error_types = [
issues.Aborted,
issues.BadSession,
issues.NotFound,
issues.InternalError,
]

except issues.Error as e:
retry_settings.on_ydb_error_callback(e)
raise
yield_sleep = True
for t in skip_yield_error_types:
if isinstance(e, t):
yield_sleep = False

if yield_sleep:
yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds)

except Exception as e:
# you should provide your own handler you want
Expand Down
135 changes: 135 additions & 0 deletions ydb/table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from unittest import mock
from ydb import (
retry_operation_impl,
YdbRetryOperationFinalResult,
issues,
YdbRetryOperationSleepOpt,
RetrySettings,
)


def test_retry_operation_impl(monkeypatch):
monkeypatch.setattr("random.random", lambda: 0.5)
monkeypatch.setattr(
issues.Error,
"__eq__",
lambda self, other: type(self) == type(other) and self.message == other.message,
)

retry_once_settings = RetrySettings(
max_retries=1,
on_ydb_error_callback=mock.Mock(),
)
retry_once_settings.unknown_error_handler = mock.Mock()

def get_results(callee):
res_generator = retry_operation_impl(callee, retry_settings=retry_once_settings)
results = []
exc = None
try:
for res in res_generator:
results.append(res)
if isinstance(res, YdbRetryOperationFinalResult):
break
except Exception as e:
exc = e

return results, exc

class TestException(Exception):
def __init__(self, message):
super(TestException, self).__init__(message)
self.message = message

def __eq__(self, other):
return type(self) == type(other) and self.message == other.message

def check_unretriable_error(err_type, call_ydb_handler):
retry_once_settings.on_ydb_error_callback.reset_mock()
retry_once_settings.unknown_error_handler.reset_mock()

results = get_results(
mock.Mock(side_effect=[err_type("test1"), err_type("test2")])
)
yields = results[0]
exc = results[1]

assert yields == []
assert exc == err_type("test1")

if call_ydb_handler:
assert retry_once_settings.on_ydb_error_callback.call_count == 1
retry_once_settings.on_ydb_error_callback.assert_called_with(
err_type("test1")
)

assert retry_once_settings.unknown_error_handler.call_count == 0
else:
assert retry_once_settings.on_ydb_error_callback.call_count == 0

assert retry_once_settings.unknown_error_handler.call_count == 1
retry_once_settings.unknown_error_handler.assert_called_with(
err_type("test1")
)

def check_retriable_error(err_type, backoff):
retry_once_settings.on_ydb_error_callback.reset_mock()

results = get_results(
mock.Mock(side_effect=[err_type("test1"), err_type("test2")])
)
yields = results[0]
exc = results[1]

if backoff:
assert [
YdbRetryOperationSleepOpt(backoff.calc_timeout(0)),
YdbRetryOperationSleepOpt(backoff.calc_timeout(1)),
] == yields
else:
assert [] == yields

assert exc == err_type("test2")

assert retry_once_settings.on_ydb_error_callback.call_count == 2
retry_once_settings.on_ydb_error_callback.assert_any_call(err_type("test1"))
retry_once_settings.on_ydb_error_callback.assert_called_with(err_type("test2"))

assert retry_once_settings.unknown_error_handler.call_count == 0

# check ok
assert get_results(lambda: True) == ([YdbRetryOperationFinalResult(True)], None)

# check retry error and return result
assert get_results(mock.Mock(side_effect=[issues.Overloaded("test"), True])) == (
[
YdbRetryOperationSleepOpt(retry_once_settings.slow_backoff.calc_timeout(0)),
YdbRetryOperationFinalResult(True),
],
None,
)

# check errors
check_retriable_error(issues.Aborted, None)
check_retriable_error(issues.BadSession, None)

check_retriable_error(issues.NotFound, None)
with mock.patch.object(retry_once_settings, "retry_not_found", False):
check_unretriable_error(issues.NotFound, True)

check_retriable_error(issues.InternalError, None)
with mock.patch.object(retry_once_settings, "retry_internal_error", False):
check_unretriable_error(issues.InternalError, True)

check_retriable_error(issues.Overloaded, retry_once_settings.slow_backoff)
check_retriable_error(issues.SessionPoolEmpty, retry_once_settings.slow_backoff)
check_retriable_error(issues.ConnectionError, retry_once_settings.slow_backoff)

check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff)

check_unretriable_error(issues.Undetermined, True)
with mock.patch.object(retry_once_settings, "idempotent", True):
check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff)

check_unretriable_error(issues.Error, True)
check_unretriable_error(TestException, False)