Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] settlement and mgmt error redesign #14178

Merged
merged 13 commits into from
Oct 23, 2020
Merged
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`

**Breaking Changes**

* Message settlement methods (`complete`, `abandon`, `defer` and `dead_letter`)
and methods that use amqp management link for request like `schedule_messages`, `received_deferred_messages`, etc.
now raise more concrete exception other than `MessageSettleFailed` and `ServiceBusError`.

**BugFixes**

* Updated uAMQP dependency to 1.2.11.
Expand Down
90 changes: 47 additions & 43 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,43 @@ def _generate_sas_token(uri, policy, key, expiry=None):
return _AccessToken(token=token, expires_on=abs_expiry)


def _do_retryable_operation(handler, operation, timeout=None, **kwargs):
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
# type: (BaseHandler, Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
max_retries = handler._config.retry_total

abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None

while retried_times <= max_retries:
try:
if operation_requires_timeout and abs_timeout_time:
remaining_timeout = abs_timeout_time - time.time()
kwargs["timeout"] = remaining_timeout
return operation(**kwargs)
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = handler._handle_exception(exception, **kwargs)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
if retried_times > max_retries:
_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
handler._container_id,
last_exception,
)
raise last_exception
handler._backoff(
retried_times=retried_times,
last_exception=last_exception,
abs_timeout_time=abs_timeout_time
)


class ServiceBusSASTokenCredential(object):
"""The shared access token credential used for authentication.
:param str token: The shared access token string
Expand Down Expand Up @@ -220,9 +257,10 @@ def __enter__(self):
def __exit__(self, *args):
self.close()

def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self)
def _handle_exception(self, exception, **kwargs):
# type: (BaseException, Any) -> ServiceBusError
error, error_need_close_handler, error_need_raise = \
_create_servicebus_exception(_LOGGER, exception, self, **kwargs)
if error_need_close_handler:
self._close_handler()
if error_need_raise:
Expand Down Expand Up @@ -257,41 +295,6 @@ def _backoff(
)
raise last_exception

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
max_retries = self._config.retry_total

abs_timeout_time = (time.time() + timeout) if (operation_requires_timeout and timeout) else None

while retried_times <= max_retries:
try:
if operation_requires_timeout and abs_timeout_time:
remaining_timeout = abs_timeout_time - time.time()
kwargs["timeout"] = remaining_timeout
return operation(**kwargs)
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = self._handle_exception(exception)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
if retried_times > max_retries:
_LOGGER.info(
"%r operation has exhausted retry. Last exception: %r.",
self._container_id,
last_exception,
)
raise last_exception
self._backoff(
retried_times=retried_times,
last_exception=last_exception,
abs_timeout_time=abs_timeout_time
)

def _mgmt_request_response(
self,
mgmt_operation,
Expand All @@ -301,15 +304,15 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
"""
Execute an amqp management operation.

:param bytes mgmt_operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:param message: The message to send in the management request.
:paramtype message: ~uamqp.message.Message
:paramtype message: Any
:param callback: The callback which is used to parse the returning message.
:paramtype callback: Callable[int, ~uamqp.message.Message, str]
:param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when
Expand Down Expand Up @@ -348,11 +351,12 @@ def _mgmt_request_response(
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", inner_exception=exp)
raise ServiceBusError("Management request failed: {}".format(exp), exp)
raise

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
return self._do_retryable_operation(
return _do_retryable_operation(
self,
self._mgmt_request_response,
mgmt_operation=mgmt_operation,
message=message,
Expand All @@ -366,7 +370,7 @@ def _open(self): # pylint: disable=no-self-use
raise ValueError("Subclass should override the method.")

def _open_with_retry(self):
return self._do_retryable_operation(self._open)
return _do_retryable_operation(self, self._open)

def _close_handler(self):
if self._handler:
Expand Down
48 changes: 35 additions & 13 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any

import uamqp.errors
import uamqp.message
from uamqp.constants import MessageState

Expand Down Expand Up @@ -56,6 +57,7 @@
MessageSettleFailed,
MessageContentTooLarge,
ServiceBusError)
from .._base_handler import _do_retryable_operation
from .utils import utc_from_timestamp, utc_now, transform_messages_to_sendable_if_needed
if TYPE_CHECKING:
from .._servicebus_receiver import ServiceBusReceiver
Expand Down Expand Up @@ -759,7 +761,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None # type: Optional[Exception]
self.auto_renew_error = None # type: Optional[Exception]
try:
self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver]
except KeyError:
Expand All @@ -770,7 +772,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
def _check_live(self, action):
# pylint: disable=no-member
if not self._receiver or not self._receiver._running: # pylint: disable=protected-access
raise MessageSettleFailed(action, "Orphan message had no open connection.")
raise MessageSettleFailed(action, ServiceBusError("Orphan message had no open connection."))
if self._settled:
raise MessageAlreadySettled(action)
try:
Expand Down Expand Up @@ -902,10 +904,10 @@ def locked_until_utc(self):

class ReceivedMessage(ReceivedMessageBase):
def _settle_message(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None
):
# type: (str, Optional[str], Optional[str]) -> None
try:
Expand All @@ -915,7 +917,7 @@ def _settle_message(
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
return
except RuntimeError as exception:
except Exception as exception: # pylint: disable=broad-except
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
Expand All @@ -925,8 +927,28 @@ def _settle_message(
self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.info(
"Message settling: %r has encountered an exception (%r) through management link",
settle_operation,
exception
)
raise

def _settle_message_with_retry(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None
):
_do_retryable_operation(
self._receiver,
self._settle_message,
timeout=None,
settle_operation=settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description
)

def complete(self):
# type: () -> None
Expand All @@ -952,7 +974,7 @@ def complete(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_COMPLETE)
self._settle_message(MESSAGE_COMPLETE)
self._settle_message_with_retry(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, error_description=None):
Expand Down Expand Up @@ -983,7 +1005,7 @@ def dead_letter(self, reason=None, error_description=None):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
self._settle_message(MESSAGE_DEAD_LETTER,
self._settle_message_with_retry(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True
Expand Down Expand Up @@ -1012,7 +1034,7 @@ def abandon(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_ABANDON)
self._settle_message(MESSAGE_ABANDON)
self._settle_message_with_retry(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -1039,7 +1061,7 @@ def defer(self):
by calling receive_deffered_messages with its sequence number
"""
self._check_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
self._settle_message_with_retry(MESSAGE_DEFER)
self._settled = True

def renew_lock(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from uamqp.constants import SenderSettleMode
from uamqp.authentication.common import AMQPAuth

from ._base_handler import BaseHandler
from ._base_handler import BaseHandler, _do_retryable_operation
from ._common.utils import create_authentication
from ._common.message import PeekedMessage, ReceivedMessage
from ._common.constants import (
Expand Down Expand Up @@ -157,7 +157,7 @@ def __next__(self):
self._check_live()
while True:
try:
return self._do_retryable_operation(self._iter_next)
return _do_retryable_operation(self, self._iter_next)
except StopIteration:
self._message_iter = None
raise
Expand Down Expand Up @@ -256,6 +256,7 @@ def _receive(self, max_message_count=None, timeout=None):

def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
# type: (bytes, List[str], Optional[Dict[str, Any]]) -> Any
# Message settlement through the mgmt link.
message = {
MGMT_REQUEST_DISPOSITION_STATUS: settlement,
MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)
Expand All @@ -265,7 +266,8 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
if dead_letter_details:
message.update(dead_letter_details)

return self._mgmt_request_response_with_retry(
# We don't do retry here, retry is done in the ReceivedMessage._settle_message
return self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default
Expand Down Expand Up @@ -411,7 +413,8 @@ def receive_messages(self, max_message_count=None, max_wait_time=None):

"""
self._check_live()
return self._do_retryable_operation(
return _do_retryable_operation(
self,
self._receive,
max_message_count=max_message_count,
timeout=max_wait_time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from uamqp import SendClient, types
from uamqp.authentication.common import AMQPAuth

from ._base_handler import BaseHandler
from ._base_handler import BaseHandler, _do_retryable_operation
from ._common import mgmt_handlers
from ._common.message import Message, BatchMessage
from .exceptions import (
Expand Down Expand Up @@ -361,7 +361,8 @@ def send_messages(self, message, **kwargs):
if not isinstance(message, BatchMessage) and not isinstance(message, Message):
raise TypeError("Can only send azure.servicebus.<BatchMessage,Message> or lists of Messages.")

self._do_retryable_operation(
_do_retryable_operation(
self,
self._send,
message=message,
timeout=timeout,
Expand Down
Loading