From bb28c10ba4b67f22cbd653a6b219dfeb5fe03002 Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" Date: Fri, 23 Oct 2020 10:36:05 -0700 Subject: [PATCH] [ServiceBus] settlement and mgmt error redesign (#14178) * settlement and mgmt error redesign init commit * sync error refactor * update changelog * fix mypy and pylint * add tests and fix bug in async impl * fix pylint * move do_retryable_operation back to handler * remove resource not found error, message related error inherit from message error, update readme and changelog * update exception handling part in readme * revert back to catch runtime error in settlement via receiver link * Update sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py Co-authored-by: KieranBrantnerMagee Co-authored-by: KieranBrantnerMagee --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 9 ++ sdk/servicebus/azure-servicebus/README.md | 42 +++++ .../azure/servicebus/_base_handler.py | 70 ++++---- .../azure/servicebus/_common/message.py | 46 ++++-- .../azure/servicebus/_servicebus_receiver.py | 4 +- .../azure/servicebus/aio/_async_message.py | 43 +++-- .../servicebus/aio/_base_handler_async.py | 62 ++++---- .../azure/servicebus/exceptions.py | 149 ++++++++++++------ .../tests/async_tests/test_queues_async.py | 46 ++++++ .../azure-servicebus/tests/test_queues.py | 46 ++++++ .../azure-servicebus/tests/test_sb_client.py | 3 +- 11 files changed, 381 insertions(+), 139 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 108bd6ee4e39..4ec77e9b128f 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -13,6 +13,15 @@ - `ServiceBusSession`: `get_state`, `set_state` and `renew_lock` - `ReceivedMessage`: `renew_lock` +**Breaking Changes** + +* Message settlement methods (`complete`, `abandon`, `defer` and `dead_letter`) +and methods that use amqp management link for request like `schedule_messages`, `received_deferred_messages`, etc. +now raise more concrete exception other than `MessageSettleFailed` and `ServiceBusError`. +* Exceptions `MessageSendFailed`, `MessageSettleFailed` and `MessageLockExpired` + now inherit from `azure.servicebus.exceptions.MessageError`. +* Removed Exception `ServiceBusResourceNotFound` as `azure.core.exceptions.ResourceNotFoundError` is now raised when a Service Bus resource does not exist. + **BugFixes** * Updated uAMQP dependency to 1.2.12. diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index f089f1eb81c6..a5141c4ad182 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -369,6 +369,48 @@ a generator-style receive will run for before exiting if there are no messages. ### Common Exceptions +The Service Bus APIs generate the following exceptions in azure.servicebus.exceptions: + +- **ServiceBusConnectionError:** An error occurred in the connection to the service. +This may have been caused by a transient network issue or service problem. It is recommended to retry. +- **ServiceBusAuthorizationError:** An error occurred when authorizing the connection to the service. +This may have been caused by the credentials not having the right permission to perform the operation. +It is recommended to check the permission of the credentials. +- **ServiceBusAuthenticationError:** An error occurred when authenticate the connection to the service. +This may have been caused by the credentials being incorrect. It is recommended to check the credentials. +- **NoActiveSession:** This indicates that there is no active sessions receive from. +This may have been caused by all sessions in the entity have been occupied by other receiver instances. +It is recommended to retry if necessary. +- **OperationTimeoutError:** This indicates that the service did not respond to an operation within the expected amount of time. +This may have been caused by a transient network issue or service problem. The service may or may not have successfully completed the request; the status is not known. +It is recommended to attempt to verify the current state and retry if necessary. +- **MessageContentTooLarge:** This indicate that the message content is larger than the service bus frame size. +This could happen when too many service bus messages are sent in a batch or the content passed into +the body of a `Message` is too large. It is recommended to reduce the count of messages being sent in a batch or the size of content being passed into a single `Message`. +- **MessageAlreadySettled:** This indicates failure to settle the message. +This could happen when trying to settle an already-settled message. +- **MessageSettleFailed:** The attempt to settle a message failed. +This could happen when the service is unable to process the request. +It is recommended to retry or receive and settle the message again. +- **MessageSendFailed:** A message failed to be sent to the Service Bus entity. +This could happen when the service is unable to process the request. +It is recommended to retry, check the message content or reduce the count of messages in a batch. +- **MessageLockExpired:** The lock on the message has expired and it has been released back to the queue. +It will need to be received again in order to settle it. +You should be aware of the lock duration of a message and keep renewing the lock before expiration in case of long processing time. +`AutoLockRenewer` could help on keeping the lock of the message automatically renewed. +- **SessionLockExpired:** The lock on the session has expired. +All unsettled messages that have been received can no longer be settled. +It is recommended to reconnect to the session if receive messages again if necessary. +You should be aware of the lock duration of a session and keep renewing the lock before expiration in case of long processing time. +`AutoLockRenewer` could help on keeping the lock of the session automatically renewed. +- **AutoLockRenewFailed:** An attempt to renew a lock on a message or session in the background has failed. +This could happen when the receiver used by `AutoLockRenerer` is closed or the lock of the renewable has expired. +It is recommended to re-register the renewable message or session by receiving the message or connect to the sessionful entity again. +- **AutoLockRenewTimeout:** The time allocated to renew the message or session lock has elapsed. You could re-register the object that wants be auto lock renewed or extend the timeout in advance. +- **MessageError:** Operation on message failed because the message is in a wrong state. It is the root error class of message related errors described above. +- **ServiceBusError:** All other Service Bus related errors. It is the root error class of all the errors described above. + Please view the [exceptions reference docs][exception_reference] for detailed descriptions of our common Exception types. ## Next steps diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py index 7bf24816b02f..69b6aaccb6b3 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py @@ -220,9 +220,10 @@ def __enter__(self): def __exit__(self, *args): self.close() - def _handle_exception(self, exception): - # type: (BaseException) -> ServiceBusError - error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self) + def _handle_exception(self, exception, **kwargs): + # type: (BaseException, Any) -> ServiceBusError + error, error_need_close_handler, error_need_raise = \ + _create_servicebus_exception(_LOGGER, exception, self, **kwargs) if error_need_close_handler: self._close_handler() if error_need_raise: @@ -230,35 +231,9 @@ def _handle_exception(self, exception): return error - def _backoff( - self, - retried_times, - last_exception, - abs_timeout_time=None, - entity_name=None - ): - # type: (int, Exception, Optional[float], str) -> None - entity_name = entity_name or self._container_id - backoff = self._config.retry_backoff_factor * 2 ** retried_times - if backoff <= self._config.retry_backoff_max and ( - abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time - ): # pylint:disable=no-else-return - time.sleep(backoff) - _LOGGER.info( - "%r has an exception (%r). Retrying...", - format(entity_name), - last_exception, - ) - else: - _LOGGER.info( - "%r operation has timed out. Last exception before timeout is (%r)", - entity_name, - last_exception, - ) - raise last_exception - def _do_retryable_operation(self, operation, timeout=None, **kwargs): # type: (Callable, Optional[float], Any) -> Any + # pylint: disable=protected-access require_last_exception = kwargs.pop("require_last_exception", False) operation_requires_timeout = kwargs.pop("operation_requires_timeout", False) retried_times = 0 @@ -275,7 +250,7 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs): except StopIteration: raise except Exception as exception: # pylint: disable=broad-except - last_exception = self._handle_exception(exception) + last_exception = self._handle_exception(exception, **kwargs) if require_last_exception: kwargs["last_exception"] = last_exception retried_times += 1 @@ -292,6 +267,33 @@ def _do_retryable_operation(self, operation, timeout=None, **kwargs): abs_timeout_time=abs_timeout_time ) + def _backoff( + self, + retried_times, + last_exception, + abs_timeout_time=None, + entity_name=None + ): + # type: (int, Exception, Optional[float], str) -> None + entity_name = entity_name or self._container_id + backoff = self._config.retry_backoff_factor * 2 ** retried_times + if backoff <= self._config.retry_backoff_max and ( + abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time + ): # pylint:disable=no-else-return + time.sleep(backoff) + _LOGGER.info( + "%r has an exception (%r). Retrying...", + format(entity_name), + last_exception, + ) + else: + _LOGGER.info( + "%r operation has timed out. Last exception before timeout is (%r)", + entity_name, + last_exception, + ) + raise last_exception + def _mgmt_request_response( self, mgmt_operation, @@ -301,7 +303,7 @@ def _mgmt_request_response( timeout=None, **kwargs ): - # type: (bytes, uamqp.Message, Callable, bool, Optional[float], Any) -> uamqp.Message + # type: (bytes, Any, Callable, bool, Optional[float], Any) -> uamqp.Message """ Execute an amqp management operation. @@ -309,7 +311,7 @@ def _mgmt_request_response( be service-specific, but common values include READ, CREATE and UPDATE. This value will be added as an application property on the message. :param message: The message to send in the management request. - :paramtype message: ~uamqp.message.Message + :paramtype message: Any :param callback: The callback which is used to parse the returning message. :paramtype callback: Callable[int, ~uamqp.message.Message, str] :param keep_alive_associated_link: A boolean flag for keeping associated amqp sender/receiver link alive when @@ -348,7 +350,7 @@ def _mgmt_request_response( except Exception as exp: # pylint: disable=broad-except if isinstance(exp, compat.TimeoutException): raise OperationTimeoutError("Management operation timed out.", inner_exception=exp) - raise ServiceBusError("Management request failed: {}".format(exp), exp) + raise def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs): # type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 67e79f9bb575..fbefe82ce51a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -12,6 +12,7 @@ import copy from typing import Optional, List, Union, Iterable, TYPE_CHECKING, Callable, Any +import uamqp.errors import uamqp.message from uamqp.constants import MessageState @@ -759,7 +760,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs): self._settled = (receive_mode == ReceiveMode.ReceiveAndDelete) self._received_timestamp_utc = utc_now() self._is_deferred_message = kwargs.get("is_deferred_message", False) - self.auto_renew_error = None # type: Optional[Exception] + self.auto_renew_error = None # type: Optional[Exception] try: self._receiver = kwargs.pop("receiver") # type: Union[ServiceBusReceiver, ServiceBusSessionReceiver] except KeyError: @@ -770,7 +771,7 @@ def __init__(self, message, receive_mode=ReceiveMode.PeekLock, **kwargs): def _check_live(self, action): # pylint: disable=no-member if not self._receiver or not self._receiver._running: # pylint: disable=protected-access - raise MessageSettleFailed(action, "Orphan message had no open connection.") + raise MessageSettleFailed(action, ServiceBusError("Orphan message had no open connection.")) if self._settled: raise MessageAlreadySettled(action) try: @@ -902,10 +903,10 @@ def locked_until_utc(self): class ReceivedMessage(ReceivedMessageBase): def _settle_message( - self, - settle_operation, - dead_letter_reason=None, - dead_letter_error_description=None, + self, + settle_operation, + dead_letter_reason=None, + dead_letter_error_description=None ): # type: (str, Optional[str], Optional[str]) -> None try: @@ -925,8 +926,29 @@ def _settle_message( self._settle_via_mgmt_link(settle_operation, dead_letter_reason=dead_letter_reason, dead_letter_error_description=dead_letter_error_description)() - except Exception as e: - raise MessageSettleFailed(settle_operation, e) + except Exception as exception: # pylint: disable=broad-except + _LOGGER.info( + "Message settling: %r has encountered an exception (%r) through management link", + settle_operation, + exception + ) + raise + + def _settle_message_with_retry( + self, + settle_operation, + dead_letter_reason=None, + dead_letter_error_description=None, + **kwargs + ): + # pylint: disable=unused-argument, protected-access + self._receiver._do_retryable_operation( + self._settle_message, + timeout=None, + settle_operation=settle_operation, + dead_letter_reason=dead_letter_reason, + dead_letter_error_description=dead_letter_error_description + ) def complete(self): # type: () -> None @@ -952,7 +974,7 @@ def complete(self): """ # pylint: disable=protected-access self._check_live(MESSAGE_COMPLETE) - self._settle_message(MESSAGE_COMPLETE) + self._settle_message_with_retry(MESSAGE_COMPLETE) self._settled = True def dead_letter(self, reason=None, error_description=None): @@ -983,7 +1005,7 @@ def dead_letter(self, reason=None, error_description=None): """ # pylint: disable=protected-access self._check_live(MESSAGE_DEAD_LETTER) - self._settle_message(MESSAGE_DEAD_LETTER, + self._settle_message_with_retry(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_error_description=error_description) self._settled = True @@ -1012,7 +1034,7 @@ def abandon(self): """ # pylint: disable=protected-access self._check_live(MESSAGE_ABANDON) - self._settle_message(MESSAGE_ABANDON) + self._settle_message_with_retry(MESSAGE_ABANDON) self._settled = True def defer(self): @@ -1039,7 +1061,7 @@ def defer(self): by calling receive_deffered_messages with its sequence number """ self._check_live(MESSAGE_DEFER) - self._settle_message(MESSAGE_DEFER) + self._settle_message_with_retry(MESSAGE_DEFER) self._settled = True def renew_lock(self, **kwargs): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 5d7d92cd1478..1dab66de0850 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -256,6 +256,7 @@ def _receive(self, max_message_count=None, timeout=None): def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): # type: (bytes, List[str], Optional[Dict[str, Any]]) -> Any + # Message settlement through the mgmt link. message = { MGMT_REQUEST_DISPOSITION_STATUS: settlement, MGMT_REQUEST_LOCK_TOKENS: types.AMQPArray(lock_tokens) @@ -265,7 +266,8 @@ def _settle_message(self, settlement, lock_tokens, dead_letter_details=None): if dead_letter_details: message.update(dead_letter_details) - return self._mgmt_request_response_with_retry( + # We don't do retry here, retry is done in the ReceivedMessage._settle_message + return self._mgmt_request_response( REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, message, mgmt_handlers.default diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py index 5591de57f1c8..e4507083a788 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_message.py @@ -18,7 +18,6 @@ ) from .._common.utils import utc_from_timestamp from ._async_utils import get_running_loop -from ..exceptions import MessageSettleFailed _LOGGER = logging.getLogger(__name__) @@ -27,11 +26,28 @@ class ReceivedMessage(sync_message.ReceivedMessageBase): """A Service Bus Message received from service side. """ + + async def _settle_message_with_retry( + self, + settle_operation, + dead_letter_reason=None, + dead_letter_error_description=None, + **kwargs + ): + # pylint: disable=unused-argument, protected-access + await self._receiver._do_retryable_operation( + self._settle_message, + timeout=None, + settle_operation=settle_operation, + dead_letter_reason=dead_letter_reason, + dead_letter_error_description=dead_letter_error_description + ) + async def _settle_message( # type: ignore - self, - settle_operation, - dead_letter_reason=None, - dead_letter_error_description=None, + self, + settle_operation, + dead_letter_reason=None, + dead_letter_error_description=None, ): try: if not self._is_deferred_message: @@ -55,8 +71,13 @@ async def _settle_message( # type: ignore await self._settle_via_mgmt_link(settle_operation, dead_letter_reason=dead_letter_reason, dead_letter_error_description=dead_letter_error_description)() - except Exception as e: - raise MessageSettleFailed(settle_operation, e) + except Exception as exception: # pylint: disable=broad-except + _LOGGER.info( + "Message settling: %r has encountered an exception (%r) through management link", + settle_operation, + exception + ) + raise async def complete(self) -> None: # type: ignore """Complete the message. @@ -71,7 +92,7 @@ async def complete(self) -> None: # type: ignore """ # pylint: disable=protected-access self._check_live(MESSAGE_COMPLETE) - await self._settle_message(MESSAGE_COMPLETE) + await self._settle_message_with_retry(MESSAGE_COMPLETE) self._settled = True async def dead_letter( # type: ignore @@ -93,7 +114,7 @@ async def dead_letter( # type: ignore """ # pylint: disable=protected-access self._check_live(MESSAGE_DEAD_LETTER) - await self._settle_message(MESSAGE_DEAD_LETTER, + await self._settle_message_with_retry(MESSAGE_DEAD_LETTER, dead_letter_reason=reason, dead_letter_error_description=error_description) self._settled = True @@ -110,7 +131,7 @@ async def abandon(self) -> None: # type: ignore """ # pylint: disable=protected-access self._check_live(MESSAGE_ABANDON) - await self._settle_message(MESSAGE_ABANDON) + await self._settle_message_with_retry(MESSAGE_ABANDON) self._settled = True async def defer(self) -> None: # type: ignore @@ -126,7 +147,7 @@ async def defer(self) -> None: # type: ignore """ # pylint: disable=protected-access self._check_live(MESSAGE_DEFER) - await self._settle_message(MESSAGE_DEFER) + await self._settle_message_with_retry(MESSAGE_DEFER) self._settled = True async def renew_lock(self, **kwargs: Any) -> datetime.datetime: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py index 1cda1ec66922..040b63b18969 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_base_handler_async.py @@ -113,8 +113,9 @@ async def __aenter__(self): async def __aexit__(self, *args): await self.close() - async def _handle_exception(self, exception): - error, error_need_close_handler, error_need_raise = _create_servicebus_exception(_LOGGER, exception, self) + async def _handle_exception(self, exception, **kwargs): + error, error_need_close_handler, error_need_raise = \ + _create_servicebus_exception(_LOGGER, exception, self, **kwargs) if error_need_close_handler: await self._close_handler() if error_need_raise: @@ -122,34 +123,9 @@ async def _handle_exception(self, exception): return error - async def _backoff( - self, - retried_times, - last_exception, - abs_timeout_time=None, - entity_name=None - ): - entity_name = entity_name or self._container_id - backoff = self._config.retry_backoff_factor * 2 ** retried_times - if backoff <= self._config.retry_backoff_max and ( - abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time - ): - await asyncio.sleep(backoff) - _LOGGER.info( - "%r has an exception (%r). Retrying...", - entity_name, - last_exception, - ) - else: - _LOGGER.info( - "%r operation has timed out. Last exception before timeout is (%r)", - entity_name, - last_exception, - ) - raise last_exception - async def _do_retryable_operation(self, operation, timeout=None, **kwargs): # type: (Callable, Optional[float], Any) -> Any + # pylint: disable=protected-access require_last_exception = kwargs.pop("require_last_exception", False) operation_requires_timeout = kwargs.pop("operation_requires_timeout", False) retried_times = 0 @@ -166,7 +142,7 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs): except StopAsyncIteration: raise except Exception as exception: # pylint: disable=broad-except - last_exception = await self._handle_exception(exception) + last_exception = await self._handle_exception(exception, **kwargs) if require_last_exception: kwargs["last_exception"] = last_exception retried_times += 1 @@ -183,6 +159,32 @@ async def _do_retryable_operation(self, operation, timeout=None, **kwargs): abs_timeout_time=abs_timeout_time ) + async def _backoff( + self, + retried_times, + last_exception, + abs_timeout_time=None, + entity_name=None + ): + entity_name = entity_name or self._container_id + backoff = self._config.retry_backoff_factor * 2 ** retried_times + if backoff <= self._config.retry_backoff_max and ( + abs_timeout_time is None or (backoff + time.time()) <= abs_timeout_time + ): + await asyncio.sleep(backoff) + _LOGGER.info( + "%r has an exception (%r). Retrying...", + entity_name, + last_exception, + ) + else: + _LOGGER.info( + "%r operation has timed out. Last exception before timeout is (%r)", + entity_name, + last_exception, + ) + raise last_exception + async def _mgmt_request_response( self, mgmt_operation, @@ -236,7 +238,7 @@ async def _mgmt_request_response( except Exception as exp: # pylint: disable=broad-except if isinstance(exp, compat.TimeoutException): raise OperationTimeoutError("Management operation timed out.", inner_exception=exp) - raise ServiceBusError("Management request failed: {}".format(exp), exp) + raise async def _mgmt_request_response_with_retry(self, mgmt_operation, message, callback, timeout=None, **kwargs): # type: (bytes, Dict[str, Any], Callable, Optional[float], Any) -> Any diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py index 202730ddcfa1..d376bc1c76b5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/exceptions.py @@ -39,6 +39,27 @@ b"com.microsoft:argument-error") +_AMQP_SESSION_ERROR_CONDITIONS = ( + SESSION_LOCK_LOST, + SESSION_LOCK_TIMEOUT +) + + +_AMQP_CONNECTION_ERRORS = ( + errors.LinkDetach, + errors.ConnectionClose, + errors.MessageHandlerError, + errors.AMQPConnectionError +) + + +_AMQP_MESSAGE_ERRORS = ( + errors.MessageAlreadySettled, + errors.MessageContentTooLarge, + errors.MessageException +) + + def _error_handler(error): """Handle connection and service errors. @@ -64,63 +85,97 @@ def _error_handler(error): return errors.ErrorAction(retry=True) -def _create_servicebus_exception(logger, exception, handler): # pylint: disable=too-many-statements +def _handle_amqp_connection_error(logger, exception, handler): + # Handle all exception inherited from uamqp.errors.AMQPConnectionError + error_need_close_handler = True + error_need_raise = False + error = None + if isinstance(exception, errors.LinkDetach) and exception.condition in _AMQP_SESSION_ERROR_CONDITIONS: + # In session lock lost or no active session case, we don't retry, close the handler and raise the error + error_need_raise = True + if exception.condition == SESSION_LOCK_LOST: + try: + session_id = handler._session_id # pylint: disable=protected-access + except AttributeError: + session_id = None + error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(session_id)) + elif exception.condition == SESSION_LOCK_TIMEOUT: + error = NoActiveSession("Queue has no active session to receive from.") + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + # In other link detach and connection case, should retry + logger.info("Handler detached due to exception: (%r).", exception) + if exception.condition == constants.ErrorCodes.UnauthorizedAccess: + error = ServiceBusAuthorizationError(str(exception), exception) + elif exception.condition == constants.ErrorCodes.NotAllowed and 'requires sessions' in str(exception): + message = str(exception) + '\n\nDid you want ServiceBusClient.get__session_receiver()?' + error = ServiceBusConnectionError(message, exception) + else: + error = ServiceBusConnectionError(str(exception), exception) + elif isinstance(exception, errors.MessageHandlerError): + logger.info("Handler error: (%r).", exception) + error = ServiceBusConnectionError(str(exception), exception) + else: + # handling general uamqp.errors.AMQPConnectionError + logger.info("Failed to open handler: (%r).", exception) + message = "Failed to open handler: {}.".format(exception) + error = ServiceBusConnectionError(message, exception) + error_need_raise, error_need_close_handler = True, False + + return error, error_need_close_handler, error_need_raise + + +def _handle_amqp_message_error(logger, exception, **kwargs): + # Handle amqp message related errors error_need_close_handler = True error_need_raise = False + error = None if isinstance(exception, errors.MessageAlreadySettled): + # This one doesn't need retry, should raise the error logger.info("Message already settled (%r)", exception) - error = MessageAlreadySettled(exception) + error = MessageAlreadySettled(kwargs.get("settle_operation", "Unknown operation")) error_need_close_handler = False error_need_raise = True elif isinstance(exception, errors.MessageContentTooLarge) or \ (isinstance(exception, errors.MessageException) and exception.condition == constants.ErrorCodes.LinkMessageSizeExceeded): - logger.info("Message content is too large (%r)", exception) - error = MessageContentTooLarge(exception) + # This one doesn't need retry, should raise the error + logger.info("Message content is too large (%r).", exception) + error = MessageContentTooLarge("Message content is too large.", exception) error_need_close_handler = False error_need_raise = True - elif isinstance(exception, errors.MessageException): + else: + # handling general uamqp.errors.MessageException logger.info("Message send failed (%r)", exception) if exception.condition == constants.ErrorCodes.ClientError and 'timed out' in str(exception): error = OperationTimeoutError("Send operation timed out", inner_exception=exception) else: error = MessageSendFailed(exception) error_need_raise = False - elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_LOST: - try: - session_id = handler._session_id # pylint: disable=protected-access - except AttributeError: - session_id = None - error = SessionLockExpired("Connection detached - lock on Session {} lost.".format(session_id)) - error_need_raise = True - elif isinstance(exception, errors.LinkDetach) and exception.condition == SESSION_LOCK_TIMEOUT: - error = NoActiveSession("Queue has no active session to receive from.") - error_need_raise = True + + return error, error_need_close_handler, error_need_raise + + +def _create_servicebus_exception(logger, exception, handler, **kwargs): # pylint: disable=too-many-statements + # transform amqp exceptions into servicebus exceptions + error_need_close_handler = True + error_need_raise = False + if isinstance(exception, _AMQP_CONNECTION_ERRORS): + error, error_need_close_handler, error_need_raise = \ + _handle_amqp_connection_error(logger, exception, handler) + elif isinstance(exception, _AMQP_MESSAGE_ERRORS): + error, error_need_close_handler, error_need_raise = \ + _handle_amqp_message_error(logger, exception, **kwargs) elif isinstance(exception, errors.AuthenticationException): logger.info("Authentication failed due to exception: (%r).", exception) error = ServiceBusAuthenticationError(str(exception), exception) - elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - logger.info("Handler detached due to exception: (%r).", exception) - if exception.condition == constants.ErrorCodes.UnauthorizedAccess: - error = ServiceBusAuthorizationError(str(exception), exception) - elif exception.condition == constants.ErrorCodes.NotAllowed and 'requires sessions' in str(exception): - message = str(exception) + '\n\nDid you want ServiceBusClient.get__session_receiver()?' - error = ServiceBusConnectionError(message, exception) - else: - error = ServiceBusConnectionError(str(exception), exception) - elif isinstance(exception, errors.MessageHandlerError): - logger.info("Handler error: (%r).", exception) - error = ServiceBusConnectionError(str(exception), exception) - elif isinstance(exception, errors.AMQPConnectionError): - logger.info("Failed to open handler: (%r).", exception) - message = "Failed to open handler: {}.".format(exception) - error = ServiceBusConnectionError(message, exception) - error_need_raise, error_need_close_handler = True, False else: logger.info("Unexpected error occurred (%r). Shutting down.", exception) - error = exception - if not isinstance(exception, ServiceBusError): + if kwargs.get("settle_operation"): + error = MessageSettleFailed(kwargs.get("settle_operation"), exception) + elif not isinstance(exception, ServiceBusError): error = ServiceBusError("Handler failed: {}.".format(exception), exception) + else: + error = exception try: err_condition = exception.condition @@ -155,7 +210,7 @@ def on_connection_error(self, error): class ServiceBusError(Exception): - """An error occured. + """An error occurred. This is the parent of all Service Bus errors and can be used for default error handling. @@ -168,20 +223,16 @@ def __init__(self, message, inner_exception=None): super(ServiceBusError, self).__init__(message) -class ServiceBusResourceNotFound(ServiceBusError): - """The Service Bus entity could not be reached.""" - - class ServiceBusConnectionError(ServiceBusError): - """An error occured in the connection.""" + """An error occurred in the connection.""" class ServiceBusAuthorizationError(ServiceBusError): - """An error occured when authorizing the connection.""" + """An error occurred when authorizing the connection.""" class ServiceBusAuthenticationError(ServiceBusError): - """An error occured when authenticate the connection.""" + """An error occurred when authenticate the connection.""" class NoActiveSession(ServiceBusError): @@ -193,11 +244,11 @@ class OperationTimeoutError(ServiceBusError): class MessageError(ServiceBusError): - """A message failed to send because the message is in a wrong state""" + """An error occurred when an operation on a message failed because the message is in an incorrect state.""" class MessageContentTooLarge(MessageError, ValueError): - """Message content is larger than the service bus frame size""" + """Message content is larger than the service bus frame size.""" class MessageAlreadySettled(MessageError): @@ -216,8 +267,8 @@ def __init__(self, action): super(MessageAlreadySettled, self).__init__(message) -class MessageSettleFailed(ServiceBusError): - """Attempt to settle a message failed.""" +class MessageSettleFailed(MessageError): + """An attempt to settle a message failed.""" def __init__(self, action, inner_exception): # type: (str, Exception) -> None @@ -226,7 +277,7 @@ def __init__(self, action, inner_exception): super(MessageSettleFailed, self).__init__(message, inner_exception) -class MessageSendFailed(ServiceBusError): +class MessageSendFailed(MessageError): """A message failed to send to the Service Bus entity.""" def __init__(self, inner_exception): @@ -235,13 +286,13 @@ def __init__(self, inner_exception): self.condition = None self.description = None if hasattr(inner_exception, 'condition'): - self.condition = inner_exception.condition # type: ignore + self.condition = inner_exception.condition # type: ignore self.description = inner_exception.description # type: ignore self.inner_exception = inner_exception super(MessageSendFailed, self).__init__(message, inner_exception) -class MessageLockExpired(ServiceBusError): +class MessageLockExpired(MessageError): """The lock on the message has expired and it has been released back to the queue. It will need to be received again in order to settle it. diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index ed2f4c4b6245..df0de76e9a31 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -15,6 +15,7 @@ from datetime import datetime, timedelta import uamqp +import uamqp.errors from uamqp import compat from azure.servicebus.aio import ( ServiceBusClient, @@ -1512,3 +1513,48 @@ async def hack_mgmt_execute_async(self, operation, op_type, message, timeout=0): finally: # must reset the mgmt execute method, otherwise other test cases would use the hacked execute method, leading to timeout error uamqp.async_ops.mgmt_operation_async.MgmtOperationAsync.execute_async = original_execute_method + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', lock_duration='PT5S') + async def test_async_queue_operation_negative(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + def _hack_amqp_message_complete(cls): + raise RuntimeError() + + async def _hack_amqp_mgmt_request(cls, message, operation, op_type=None, node=None, callback=None, **kwargs): + raise uamqp.errors.AMQPConnectionError() + + async def _hack_sb_message_settle_message(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None): + raise uamqp.errors.AMQPError() + + async with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) + async with sender, receiver: + # negative settlement via receiver link + await sender.send_messages(Message("body"), timeout=5) + message = (await receiver.receive_messages(max_wait_time=5))[0] + message.message.accept = types.MethodType(_hack_amqp_message_complete, message.message) + await message.complete() # settle via mgmt link + + try: + origin_amqp_mgmt_request_method = receiver._handler.mgmt_request_async + with pytest.raises(ServiceBusConnectionError): + receiver._handler.mgmt_request_async = types.MethodType(_hack_amqp_mgmt_request, receiver._handler) + await receiver.peek_messages() + finally: + receiver._handler.mgmt_request = types.MethodType(origin_amqp_mgmt_request_method, receiver._handler) + + await sender.send_messages(Message("body"), timeout=5) + + message = (await receiver.receive_messages(max_wait_time=5))[0] + message._settle_message = types.MethodType(_hack_sb_message_settle_message, message) + with pytest.raises(MessageSettleFailed): + await message.complete() + + message = (await receiver.receive_messages(max_wait_time=6))[0] + await message.complete() diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index fd09ec4ab5eb..3b7adee6267f 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -15,6 +15,7 @@ import calendar import uamqp +import uamqp.errors from uamqp import compat from azure.servicebus import ServiceBusClient, AutoLockRenewer, TransportType from azure.servicebus._common.message import Message, PeekedMessage, ReceivedMessage, BatchMessage @@ -1979,3 +1980,48 @@ def hack_mgmt_execute(self, operation, op_type, message, timeout=0): finally: # must reset the mgmt execute method, otherwise other test cases would use the hacked execute method, leading to timeout error uamqp.mgmt_operation.MgmtOperation.execute = original_execute_method + + + @pytest.mark.liveTest + @pytest.mark.live_test_only + @CachedResourceGroupPreparer(name_prefix='servicebustest') + @CachedServiceBusNamespacePreparer(name_prefix='servicebustest') + @CachedServiceBusQueuePreparer(name_prefix='servicebustest', lock_duration='PT5S') + def test_queue_operation_negative(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs): + def _hack_amqp_message_complete(cls): + raise RuntimeError() + + def _hack_amqp_mgmt_request(cls, message, operation, op_type=None, node=None, callback=None, **kwargs): + raise uamqp.errors.AMQPConnectionError() + + def _hack_sb_message_settle_message(self, settle_operation, dead_letter_reason=None, dead_letter_error_description=None): + raise uamqp.errors.AMQPError() + + with ServiceBusClient.from_connection_string( + servicebus_namespace_connection_string, logging_enable=False) as sb_client: + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) + with sender, receiver: + # negative settlement via receiver link + sender.send_messages(Message("body"), timeout=5) + message = receiver.receive_messages()[0] + message.message.accept = types.MethodType(_hack_amqp_message_complete, message.message) + message.complete() # settle via mgmt link + + try: + origin_amqp_mgmt_request_method = receiver._handler.mgmt_request + with pytest.raises(ServiceBusConnectionError): + receiver._handler.mgmt_request = types.MethodType(_hack_amqp_mgmt_request, receiver._handler) + receiver.peek_messages() + finally: + receiver._handler.mgmt_request = types.MethodType(origin_amqp_mgmt_request_method, receiver._handler) + + sender.send_messages(Message("body"), timeout=5) + + message = receiver.receive_messages()[0] + message._settle_message = types.MethodType(_hack_sb_message_settle_message, message) + with pytest.raises(MessageSettleFailed): + message.complete() + + message = receiver.receive_messages(max_wait_time=6)[0] + message.complete() diff --git a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py index 539c5c45840c..96d0fc0b189a 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sb_client.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sb_client.py @@ -20,8 +20,7 @@ ServiceBusError, ServiceBusConnectionError, ServiceBusAuthenticationError, - ServiceBusAuthorizationError, - ServiceBusResourceNotFound + ServiceBusAuthorizationError ) from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import (