Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] settlement and mgmt error redesign #14178

Merged
merged 13 commits into from
Oct 23, 2020
Merged
9 changes: 9 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
- `ServiceBusSession`: `get_state`, `set_state` and `renew_lock`
- `ReceivedMessage`: `renew_lock`

**Breaking Changes**

* Message settlement methods (`complete`, `abandon`, `defer` and `dead_letter`)
and methods that use amqp management link for request like `schedule_messages`, `received_deferred_messages`, etc.
now raise more concrete exception other than `MessageSettleFailed` and `ServiceBusError`.
* Exceptions `MessageSendFailed`, `MessageSettleFailed` and `MessageLockExpired`
now inherit from `azure.servicebus.exceptions.MessageError`.
* Removed Exception `ServiceBusResourceNotFound` as `azure.core.exceptions.ResourceNotFoundError` is now raised when a Service Bus resource does not exist.

**BugFixes**

* Updated uAMQP dependency to 1.2.11.
Expand Down
18 changes: 18 additions & 0 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,24 @@ a generator-style receive will run for before exiting if there are no messages.

### Common Exceptions

The Service Bus APIs generate the following exceptions in azure.servicebus.exceptions:
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved

- **ServiceBusConnectionError:** An error occurred in the connection.
- **ServiceBusAuthorizationError:** An error occurred when authorizing the connection.
- **ServiceBusAuthenticationError:** An error occurred when authenticate the connection.
- **NoActiveSession:** No active Sessions are available to receive from.
- **OperationTimeoutError:** Service request operation timed out.
- **MessageContentTooLarge:** Message content is larger than the service bus frame size.
- **MessageAlreadySettled:** Failed to settle the message.
- **MessageSettleFailed:** An attempt to settle a message failed.
- **MessageSendFailed:** A message failed to send to the Service Bus entity.
- **MessageLockExpired:** The lock on the message has expired and it has been released back to the queue. It will need to be received again in order to settle it.
- **SessionLockExpired:** The lock on the session has expired. All unsettled messages that have been received can no longer be settled.
- **AutoLockRenewFailed:** An attempt to renew a lock on a message or session in the background has failed.
- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed.
- **MessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above.
- **ServiceBusError:** All other Service Bus related errors. It is the root error class of all the errors described above.

Please view the [exceptions reference docs][exception_reference] for detailed descriptions of our common Exception types.

## Next steps
Expand Down
70 changes: 36 additions & 34 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,45 +220,20 @@ def __enter__(self):
def __exit__(self, *args):
self.close()

def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self)
def _handle_exception(self, exception, **kwargs):
# type: (BaseException, Any) -> ServiceBusError
error, error_need_close_handler, error_need_raise = \
_create_servicebus_exception(_LOGGER, exception, self, **kwargs)
if error_need_close_handler:
self._close_handler()
if error_need_raise:
raise error

return error

def _backoff(
self,
retried_times,
last_exception,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
"%r has an exception (%r). Retrying...",
format(entity_name),
last_exception,
)
else:
_LOGGER.info(
"%r operation has timed out. Last exception before timeout is (%r)",
entity_name,
last_exception,
)
raise last_exception

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand All @@ -275,7 +250,7 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
except StopIteration:
raise
except Exception as exception: # pylint: disable=broad-except
last_exception = self._handle_exception(exception)
last_exception = self._handle_exception(exception, **kwargs)
if require_last_exception:
kwargs["last_exception"] = last_exception
retried_times += 1
Expand All @@ -292,6 +267,33 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs):
abs_timeout_time=abs_timeout_time
)

def _backoff(
self,
retried_times,
last_exception,
abs_timeout_time=None,
entity_name=None
):
# type: (int, Exception, Optional[float], str) -> None
entity_name = entity_name or self._container_id
backoff = self._config.retry_backoff_factor * 2 ** retried_times
if backoff <= self._config.retry_backoff_max and (
abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time
): # pylint:disable=no-else-return
time.sleep(backoff)
_LOGGER.info(
"%r has an exception (%r). Retrying...",
format(entity_name),
last_exception,
)
else:
_LOGGER.info(
"%r operation has timed out. Last exception before timeout is (%r)",
entity_name,
last_exception,
)
raise last_exception

def _mgmt_request_response(
self,
mgmt_operation,
Expand All @@ -301,15 +303,15 @@ def _mgmt_request_response(
timeout=None,
**kwargs
):
# type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message
# type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message
"""
Execute an amqp management operation.

:param bytes mgmt_operation: The type of operation to be performed. This value will
be service-specific, but common values include READ, CREATE and UPDATE.
This value will be added as an application property on the message.
:param message: The message to send in the management request.
:paramtype message: ~uamqp.message.Message
:paramtype message: Any
:param callback: The callback which is used to parse the returning message.
:paramtype callback: Callable[int, ~uamqp.message.Message, str]
:param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when
Expand Down Expand Up @@ -348,7 +350,7 @@ def _mgmt_request_response(
except Exception as exp: # pylint: disable=broad-except
if isinstance(exp, compat.TimeoutException):
raise OperationTimeoutError("Management operation timed out.", inner_exception=exp)
raise ServiceBusError("Management request failed: {}".format(exp), exp)
raise

def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs):
# type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any
Expand Down
48 changes: 35 additions & 13 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import copy
from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any

import uamqp.errors
import uamqp.message
from uamqp.constants import MessageState

Expand Down Expand Up @@ -759,7 +760,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete)
self._received_timestamp_utc = utc_now()
self._is_deferred_message = kwargs.get("is_deferred_message", False)
self.auto_renew_error = None # type: Optional[Exception]
self.auto_renew_error = None # type: Optional[Exception]
try:
self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver]
except KeyError:
Expand All @@ -770,7 +771,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs):
def _check_live(self, action):
# pylint: disable=no-member
if not self._receiver or not self._receiver._running: # pylint: disable=protected-access
raise MessageSettleFailed(action, "Orphan message had no open connection.")
raise MessageSettleFailed(action, ServiceBusError("Orphan message had no open connection."))
if self._settled:
raise MessageAlreadySettled(action)
try:
Expand Down Expand Up @@ -902,10 +903,10 @@ def locked_until_utc(self):

class ReceivedMessage(ReceivedMessageBase):
def _settle_message(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None
):
# type: (str, Optional[str], Optional[str]) -> None
try:
Expand All @@ -915,7 +916,7 @@ def _settle_message(
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
return
except RuntimeError as exception:
except Exception as exception: # pylint: disable=broad-except
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
Expand All @@ -925,8 +926,29 @@ def _settle_message(
self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.info(
"Message settling: %r has encountered an exception (%r) through management link",
settle_operation,
exception
)
raise

def _settle_message_with_retry(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
**kwargs
):
# pylint: disable=unused-argument, protected-access
self._receiver._do_retryable_operation(
self._settle_message,
timeout=None,
settle_operation=settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description
)

def complete(self):
# type: () -> None
Expand All @@ -952,7 +974,7 @@ def complete(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_COMPLETE)
self._settle_message(MESSAGE_COMPLETE)
self._settle_message_with_retry(MESSAGE_COMPLETE)
self._settled = True

def dead_letter(self, reason=None, error_description=None):
Expand Down Expand Up @@ -983,7 +1005,7 @@ def dead_letter(self, reason=None, error_description=None):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
self._settle_message(MESSAGE_DEAD_LETTER,
self._settle_message_with_retry(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True
Expand Down Expand Up @@ -1012,7 +1034,7 @@ def abandon(self):
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_ABANDON)
self._settle_message(MESSAGE_ABANDON)
self._settle_message_with_retry(MESSAGE_ABANDON)
self._settled = True

def defer(self):
Expand All @@ -1039,7 +1061,7 @@ def defer(self):
by calling receive_deffered_messages with its sequence number
"""
self._check_live(MESSAGE_DEFER)
self._settle_message(MESSAGE_DEFER)
self._settle_message_with_retry(MESSAGE_DEFER)
self._settled = True

def renew_lock(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def _receive(self, max_message_count=None, timeout=None):

def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
# type: (bytes, List[str], Optional[Dict[str, Any]]) -> Any
# Message settlement through the mgmt link.
message = {
MGMT_REQUEST_DISPOSITION_STATUS: settlement,
MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens)
Expand All @@ -265,7 +266,8 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None):
if dead_letter_details:
message.update(dead_letter_details)

return self._mgmt_request_response_with_retry(
# We don't do retry here, retry is done in the ReceivedMessage._settle_message
return self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
)
from .._common.utils import utc_from_timestamp
from ._async_utils import get_running_loop
from ..exceptions import MessageSettleFailed

_LOGGER = logging.getLogger(__name__)

Expand All @@ -27,11 +26,28 @@ class ReceivedMessage(sync_message.ReceivedMessageBase):
"""A Service Bus Message received from service side.

"""

async def _settle_message_with_retry(
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
**kwargs
):
# pylint: disable=unused-argument, protected-access
await self._receiver._do_retryable_operation(
self._settle_message,
timeout=None,
settle_operation=settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description
)

async def _settle_message( # type: ignore
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
self,
settle_operation,
dead_letter_reason=None,
dead_letter_error_description=None,
):
try:
if not self._is_deferred_message:
Expand All @@ -45,7 +61,7 @@ async def _settle_message( # type: ignore
)
)
return
except RuntimeError as exception:
except Exception as exception: # pylint: disable=broad-except
_LOGGER.info(
"Message settling: %r has encountered an exception (%r)."
"Trying to settle through management link",
Expand All @@ -55,8 +71,13 @@ async def _settle_message( # type: ignore
await self._settle_via_mgmt_link(settle_operation,
dead_letter_reason=dead_letter_reason,
dead_letter_error_description=dead_letter_error_description)()
except Exception as e:
raise MessageSettleFailed(settle_operation, e)
except Exception as exception: # pylint: disable=broad-except
_LOGGER.info(
"Message settling: %r has encountered an exception (%r) through management link",
settle_operation,
exception
)
raise

async def complete(self) -> None: # type: ignore
"""Complete the message.
Expand All @@ -71,7 +92,7 @@ async def complete(self) -> None: # type: ignore
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_COMPLETE)
await self._settle_message(MESSAGE_COMPLETE)
await self._settle_message_with_retry(MESSAGE_COMPLETE)
self._settled = True

async def dead_letter( # type: ignore
Expand All @@ -93,7 +114,7 @@ async def dead_letter( # type: ignore
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEAD_LETTER)
await self._settle_message(MESSAGE_DEAD_LETTER,
await self._settle_message_with_retry(MESSAGE_DEAD_LETTER,
dead_letter_reason=reason,
dead_letter_error_description=error_description)
self._settled = True
Expand All @@ -110,7 +131,7 @@ async def abandon(self) -> None: # type: ignore
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_ABANDON)
await self._settle_message(MESSAGE_ABANDON)
await self._settle_message_with_retry(MESSAGE_ABANDON)
self._settled = True

async def defer(self) -> None: # type: ignore
Expand All @@ -126,7 +147,7 @@ async def defer(self) -> None: # type: ignore
"""
# pylint: disable=protected-access
self._check_live(MESSAGE_DEFER)
await self._settle_message(MESSAGE_DEFER)
await self._settle_message_with_retry(MESSAGE_DEFER)
self._settled = True

async def renew_lock(self, **kwargs: Any) -> datetime.datetime:
Expand Down
Loading