Skip to content

Commit

Permalink
[ServiceBus] settlement and mgmt error redesign (#14178)
Browse files Browse the repository at this point in the history
* settlement and mgmt error redesign init commit

* sync error refactor

* update changelog

* fix mypy and pylint

* add tests and fix bug in async impl

* fix pylint

* move do_retryable_operation back to handler

* remove resource not found error, message related error inherit from message error, update readme and changelog

* update exception handling part in readme

* revert back to catch runtime error in settlement via receiver link

* Update sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py

Co-authored-by: KieranBrantnerMagee <kibrantn@microsoft.com>

Co-authored-by: KieranBrantnerMagee <kibrantn@microsoft.com>
  • Loading branch information
yunhaoling and KieranBrantnerMagee authored Oct 23, 2020
1 parent a10d9cb commit bb28c10
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 139 deletions.
9 changes: 9 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`

**Breaking Changes**

* Message settlement methods (`complete`, `abandon`, `defer` and `dead_letter`)
and methods that use amqp management link for request like `schedule_messages`, `received_deferred_messages`, etc.
now raise more concrete exception other than `MessageSettleFailed` and `ServiceBusError`.
* Exceptions `MessageSendFailed`, `MessageSettleFailed` and `MessageLockExpired`
now inherit from `azure.servicebus.exceptions.MessageError`.
* Removed Exception `ServiceBusResourceNotFound` as `azure.core.exceptions.ResourceNotFoundError` is now raised when a Service Bus resource does not exist.

**BugFixes**

* Updated uAMQP dependency to 1.2.12.
Expand Down
42 changes: 42 additions & 0 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,48 @@ a generator-style receive will run for before exiting if there are no messages.
### Common Exceptions

The Service Bus APIs generate the following exceptions in azure.servicebus.exceptions:

- **ServiceBusConnectionError:** An error occurred in the connection to the service.
This may have been caused by a transient network issue or service problem. It is recommended to retry.
- **ServiceBusAuthorizationError:** An error occurred when authorizing the connection to the service.
This may have been caused by the credentials not having the right permission to perform the operation.
It is recommended to check the permission of the credentials.
- **ServiceBusAuthenticationError:** An error occurred when authenticate the connection to the service.
This may have been caused by the credentials being incorrect. It is recommended to check the credentials.
- **NoActiveSession:** This indicates that there is no active sessions receive from.
This may have been caused by all sessions in the entity have been occupied by other receiver instances.
It is recommended to retry if necessary.
- **OperationTimeoutError:** This indicates that the service did not respond to an operation within the expected amount of time.
This may have been caused by a transient network issue or service problem. The service may or may not have successfully completed the request; the status is not known.
It is recommended to attempt to verify the current state and retry if necessary.
- **MessageContentTooLarge:** This indicate that the message content is larger than the service bus frame size.
This could happen when too many service bus messages are sent in a batch or the content passed into
the body of a `Message` is too large. It is recommended to reduce the count of messages being sent in a batch or the size of content being passed into a single `Message`.
- **MessageAlreadySettled:** This indicates failure to settle the message.
This could happen when trying to settle an already-settled message.
- **MessageSettleFailed:** The attempt to settle a message failed.
This could happen when the service is unable to process the request.
It is recommended to retry or receive and settle the message again.
- **MessageSendFailed:** A message failed to be sent to the Service Bus entity.
This could happen when the service is unable to process the request.
It is recommended to retry, check the message content or reduce the count of messages in a batch.
- **MessageLockExpired:** The lock on the message has expired and it has been released back to the queue.
It will need to be received again in order to settle it.
You should be aware of the lock duration of a message and keep renewing the lock before expiration in case of long processing time.
`AutoLockRenewer` could help on keeping the lock of the message automatically renewed.
- **SessionLockExpired:** The lock on the session has expired.
All unsettled messages that have been received can no longer be settled.
It is recommended to reconnect to the session if receive messages again if necessary.
You should be aware of the lock duration of a session and keep renewing the lock before expiration in case of long processing time.
`AutoLockRenewer` could help on keeping the lock of the session automatically renewed.
- **AutoLockRenewFailed:** An attempt to renew a lock on a message or session in the background has failed.
This could happen when the receiver used by `AutoLockRenerer` is closed or the lock of the renewable has expired.
It is recommended to re-register the renewable message or session by receiving the message or connect to the sessionful entity again.
- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed. You could re-register the object that wants be auto lock renewed or extend the timeout in advance.
- **MessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above.
- **ServiceBusError:** All other Service Bus related errors. It is the root error class of all the errors described above.

Please view the [exceptions reference docs][exception_reference] for detailed descriptions of our common Exception types.

## Next steps
Expand Down
70 changes: 36 additions & 34 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,45 +220,20 @@ def __enter__(self):
def __exit__(self, *args):
self.close()

def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self)
def _handle_exception(self, exception, **kwargs):
# type: (BaseException, Any) -> ServiceBusError
error, error_need_close_handler, error_need_raise = \
_create_servicebus_exception(_LOGGER, exception, self, **kwargs)
if error_need_close_handler:
self._close_handler()
if error_need_raise:
raise error

return error

def _backoff(
self,
retried_times,
last_exception,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
"%r has an exception (%r). Retrying...",
format(entity_name),
last_exception,
)
else:
_LOGGER.info(
"%r operation has timed out. Last exception before timeout is (%r)",
entity_name,
last_exception,
)
raise last_exception

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand All @@ -275,7 +250,7 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = self._handle_exception(exception)
last_exception = self._handle_exception(exception, **kwargs)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
Expand All @@ -292,6 +267,33 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
abs_timeout_time=abs_timeout_time
)

def _backoff(
self,
retried_times,
last_exception,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
"%r has an exception (%r). Retrying...",
format(entity_name),
last_exception,
)
else:
_LOGGER.info(
"%r operation has timed out. Last exception before timeout is (%r)",
entity_name,
last_exception,
)
raise last_exception

def _mgmt_request_response(
self,
mgmt_operation,
Expand All @@ -301,15 +303,15 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
"""
Execute an amqp management operation.
:param bytes mgmt_operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:param message: The message to send in the management request.
:paramtype message: ~uamqp.message.Message
:paramtype message: Any
:param callback: The callback which is used to parse the returning message.
:paramtype callback: Callable[int, ~uamqp.message.Message, str]
:param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when
Expand Down Expand Up @@ -348,7 +350,7 @@ def _mgmt_request_response(
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", inner_exception=exp)
raise ServiceBusError("Management request failed: {}".format(exp), exp)
raise

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
Expand Down
46 changes: 34 additions & 12 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any

import uamqp.errors
import uamqp.message
from uamqp.constants import MessageState

Expand Down Expand Up @@ -759,7 +760,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None # type: Optional[Exception]
self.auto_renew_error = None # type: Optional[Exception]
try:
self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver]
except KeyError:
Expand All @@ -770,7 +771,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
def _check_live(self, action):
# pylint: disable=no-member
if not self._receiver or not self._receiver._running: # pylint: disable=protected-access
raise MessageSettleFailed(action, "Orphan message had no open connection.")
raise MessageSettleFailed(action, ServiceBusError("Orphan message had no open connection."))
if self._settled:
raise MessageAlreadySettled(action)
try:
Expand Down Expand Up @@ -902,10 +903,10 @@ def locked_until_utc(self):

class ReceivedMessage(ReceivedMessageBase):
def _settle_message(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None
):
# type: (str, Optional[str], Optional[str]) -> None
try:
Expand All @@ -925,8 +926,29 @@ def _settle_message(
self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.info(
"Message settling: %r has encountered an exception (%r) through management link",
settle_operation,
exception
)
raise

def _settle_message_with_retry(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
**kwargs
):
# pylint: disable=unused-argument, protected-access
self._receiver._do_retryable_operation(
self._settle_message,
timeout=None,
settle_operation=settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description
)

def complete(self):
# type: () -> None
Expand All @@ -952,7 +974,7 @@ def complete(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_COMPLETE)
self._settle_message(MESSAGE_COMPLETE)
self._settle_message_with_retry(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, error_description=None):
Expand Down Expand Up @@ -983,7 +1005,7 @@ def dead_letter(self, reason=None, error_description=None):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
self._settle_message(MESSAGE_DEAD_LETTER,
self._settle_message_with_retry(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True
Expand Down Expand Up @@ -1012,7 +1034,7 @@ def abandon(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_ABANDON)
self._settle_message(MESSAGE_ABANDON)
self._settle_message_with_retry(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -1039,7 +1061,7 @@ def defer(self):
by calling receive_deffered_messages with its sequence number
"""
self._check_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
self._settle_message_with_retry(MESSAGE_DEFER)
self._settled = True

def renew_lock(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _receive(self, max_message_count=None, timeout=None):

def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
# type: (bytes, List[str], Optional[Dict[str, Any]]) -> Any
# Message settlement through the mgmt link.
message = {
MGMT_REQUEST_DISPOSITION_STATUS: settlement,
MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)
Expand All @@ -265,7 +266,8 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
if dead_letter_details:
message.update(dead_letter_details)

return self._mgmt_request_response_with_retry(
# We don't do retry here, retry is done in the ReceivedMessage._settle_message
return self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default
Expand Down
Loading

0 comments on commit bb28c10

Please sign in to comment.