Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] Condition based error redesign + client validation alignment #15364

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## 7.0.0b9 (Unreleased)

**Breaking Changes**

* Setting `ServiceBusMessage.partition_key` to a value different than `session_id` on the message instance now raises `ValueError`.
* `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler.

## 7.0.0b8 (2020-11-05)
Expand Down
46 changes: 35 additions & 11 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from .exceptions import (
ServiceBusError,
ServiceBusAuthenticationError,
ServiceBusConnectionError,
OperationTimeoutError,
SessionLockLostError,
_create_servicebus_exception
)
from ._common.utils import create_properties
Expand Down Expand Up @@ -193,9 +195,11 @@ def _convert_connection_string_to_kwargs(cls, conn_str, **kwargs):

entity_in_kwargs = queue_name or topic_name
if entity_in_conn_str and entity_in_kwargs and (entity_in_conn_str != entity_in_kwargs):
raise ServiceBusAuthenticationError(
"Entity names do not match, the entity name in connection string is {};"
" the entity name in parameter is {}.".format(entity_in_conn_str, entity_in_kwargs)
raise ServiceBusAuthenticationError( # TODO: should this be a ValueError?
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
message="The queue or topic name provided: {} which does not match the EntityPath in"
" the connection string passed to the ServiceBusClient constructor: {}.".format(
entity_in_conn_str, entity_in_kwargs
)
)

kwargs["fully_qualified_namespace"] = host
Expand All @@ -220,24 +224,44 @@ def __enter__(self):
def __exit__(self, *args):
self.close()

def _handle_exception(self, exception, **kwargs):
# type: (BaseException, Any) -> ServiceBusError
error, error_need_close_handler, error_need_raise = \
_create_servicebus_exception(_LOGGER, exception, self, **kwargs)
if error_need_close_handler:
def _handle_exception(self, exception):
# type: (BaseException) -> ServiceBusError
# pylint: disable=protected-access
error = _create_servicebus_exception(_LOGGER, exception)

try:
# If SessionLockLostError or ServiceBusConnectionError happen when a session receiver is running,
# the receiver can no longer be used and should create a new session receiver
# instance to receive from session.
if self._session and self._running and isinstance(error, (SessionLockLostError, ServiceBusConnectionError)):
self._session._lock_lost = True
self._close_handler()
raise error
except AttributeError:
pass

if error._shutdown_handler:
self._close_handler()
if error_need_raise:
if not error._retryable:
raise error

return error

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
def _check_live(self):
"""check whether the handler is alive"""
# pylint: disable=protected-access
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")
try:
if self._session and self._session._lock_lost:
raise SessionLockLostError(error=self._session.auto_renew_error)
except AttributeError:
pass

def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,21 @@

SESSION_FILTER = VENDOR + b":session-filter"
SESSION_LOCKED_UNTIL = VENDOR + b":locked-until-utc"
SESSION_LOCK_LOST = VENDOR + b":session-lock-lost"
SESSION_LOCK_TIMEOUT = VENDOR + b":timeout"

# Error codes
ERROR_CODE_SESSION_LOCK_LOST = VENDOR + b":session-lock-lost"
ERROR_CODE_MESSAGE_LOCK_LOST = VENDOR + b":message-lock-lost"
ERROR_CODE_MESSAGE_NOT_FOUND = VENDOR + b":message-not-found"
ERROR_CODE_TIMEOUT = VENDOR + b":timeout"
ERROR_CODE_AUTH_FAILED = VENDOR + b":auth-failed"
ERROR_CODE_SESSION_CANNOT_BE_LOCKED = VENDOR + b":session-cannot-be-locked"
ERROR_CODE_SERVER_BUSY = VENDOR + b":server-busy"
ERROR_CODE_ARGUMENT_ERROR = VENDOR + b":argument-error"
ERROR_CODE_OUT_OF_RANGE = VENDOR + b":argument-out-of-range"
ERROR_CODE_ENTITY_DISABLED = VENDOR + b":entity-disabled"
ERROR_CODE_ENTITY_ALREADY_EXISTS = VENDOR + b":entity-already-exists"
ERROR_CODE_PRECONDITION_FAILED = VENDOR + b":precondition-failed",


REQUEST_RESPONSE_OPERATION_NAME = b"operation"
REQUEST_RESPONSE_TIMEOUT = VENDOR + b":server-timeout"
Expand Down Expand Up @@ -68,6 +81,7 @@
RECEIVER_LINK_DEAD_LETTER_REASON = 'DeadLetterReason'
RECEIVER_LINK_DEAD_LETTER_ERROR_DESCRIPTION = 'DeadLetterErrorDescription'
MGMT_REQUEST_OP_TYPE_ENTITY_MGMT = b"entity-mgmt"
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION = b'errorCondition'

MESSAGE_COMPLETE = 'complete'
MESSAGE_DEAD_LETTER = 'dead-letter'
Expand Down
54 changes: 35 additions & 19 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
ANNOTATION_SYMBOL_SCHEDULED_ENQUEUE_TIME,
ANNOTATION_SYMBOL_KEY_MAP
)
from ..exceptions import MessageContentTooLarge
from ..exceptions import MessageSizeExceededError
from .utils import utc_from_timestamp, utc_now, transform_messages_to_sendable_if_needed
if TYPE_CHECKING:
from ..aio._servicebus_receiver_async import ServiceBusReceiver as AsyncServiceBusReceiver
Expand Down Expand Up @@ -161,6 +161,9 @@ def session_id(self):
@session_id.setter
def session_id(self, value):
# type: (str) -> None
if value and len(value) > 128:
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("session_id cannot be longer than 128 characters.")

self._amqp_properties.group_id = value

@property
Expand Down Expand Up @@ -202,6 +205,13 @@ def partition_key(self):
@partition_key.setter
def partition_key(self, value):
# type: (str) -> None
if value and len(value) > 128:
raise ValueError("partition_key cannot be longer than 128 characters.")

if value and value != self.session_id:
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"partition_key:{} cannot be set to a different value than session_id:{}".format(value, self.session_id)
)
self._set_message_annotations(_X_OPT_PARTITION_KEY, value)

@property
Expand Down Expand Up @@ -290,9 +300,9 @@ def content_type(self):
return self._amqp_properties.content_type

@content_type.setter
def content_type(self, val):
def content_type(self, value):
# type: (str) -> None
self._amqp_properties.content_type = val
self._amqp_properties.content_type = value

@property
def correlation_id(self):
Expand All @@ -314,9 +324,9 @@ def correlation_id(self):
return self._amqp_properties.correlation_id

@correlation_id.setter
def correlation_id(self, val):
def correlation_id(self, value):
# type: (str) -> None
self._amqp_properties.correlation_id = val
self._amqp_properties.correlation_id = value

@property
def subject(self):
Expand All @@ -334,9 +344,9 @@ def subject(self):
return self._amqp_properties.subject

@subject.setter
def subject(self, val):
def subject(self, value):
# type: (str) -> None
self._amqp_properties.subject = val
self._amqp_properties.subject = value

@property
def message_id(self):
Expand All @@ -357,9 +367,12 @@ def message_id(self):
return self._amqp_properties.message_id

@message_id.setter
def message_id(self, val):
def message_id(self, value):
# type: (str) -> None
self._amqp_properties.message_id = val
if value and len(value) > 128:
raise ValueError("message_id cannot be longer than 128 characters.")

self._amqp_properties.message_id = value

@property
def reply_to(self):
Expand All @@ -382,9 +395,9 @@ def reply_to(self):
return self._amqp_properties.reply_to

@reply_to.setter
def reply_to(self, val):
def reply_to(self, value):
# type: (str) -> None
self._amqp_properties.reply_to = val
self._amqp_properties.reply_to = value

@property
def reply_to_session_id(self):
Expand All @@ -406,9 +419,12 @@ def reply_to_session_id(self):
return self._amqp_properties.reply_to_group_id

@reply_to_session_id.setter
def reply_to_session_id(self, val):
def reply_to_session_id(self, value):
# type: (str) -> None
self._amqp_properties.reply_to_group_id = val
if value and len(value) > 128:
raise ValueError("reply_to_session_id cannot be longer than 128 characters.")

self._amqp_properties.reply_to_group_id = value

@property
def to(self):
Expand All @@ -429,9 +445,9 @@ def to(self):
return self._amqp_properties.to

@to.setter
def to(self, val):
def to(self, value):
# type: (str) -> None
self._amqp_properties.to = val
self._amqp_properties.to = value


class ServiceBusMessageBatch(object):
Expand Down Expand Up @@ -502,7 +518,7 @@ def add_message(self, message):
:param message: The Message to be added to the batch.
:type message: ~azure.servicebus.ServiceBusMessage
:rtype: None
:raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit.
:raises: :class: ~azure.servicebus.exceptions.MessageSizeExceededError, when exceeding the size limit.
"""
message = transform_messages_to_sendable_if_needed(message)
message_size = message.message.get_message_encoded_size()
Expand All @@ -516,8 +532,8 @@ def add_message(self, message):
)

if size_after_add > self.max_size_in_bytes:
raise MessageContentTooLarge(
"ServiceBusMessageBatch has reached its size limit: {}".format(
raise MessageSizeExceededError(
message="ServiceBusMessageBatch has reached its size limit: {}".format(
self.max_size_in_bytes
)
)
Expand Down Expand Up @@ -573,7 +589,7 @@ def _lock_expired(self):
try:
if self._receiver.session: # type: ignore
raise TypeError("Session messages do not expire. Please use the Session expiry instead.")
except AttributeError: # Is not a session receiver
except AttributeError: # Is not a session receiver
pass
if self.locked_until_utc and self.locked_until_utc <= utc_now():
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,44 @@
# license information.
# -------------------------------------------------------------------------

import logging
import uamqp
from .message import ServiceBusReceivedMessage
from ..exceptions import ServiceBusError, MessageLockExpired
from .constants import ReceiveMode
from ..exceptions import _handle_amqp_mgmt_error
from .constants import (
ReceiveMode,
MGMT_RESPONSE_MESSAGE_ERROR_CONDITION
)

_LOGGER = logging.getLogger(__name__)


def default(status_code, message, description):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
return message.get_data()

_handle_amqp_mgmt_error(_LOGGER, "Service request failed.", condition, description, status_code)


def session_lock_renew_op(status_code, message, description):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
return message.get_data()
raise ServiceBusError(
"Management request returned status code: {}. Description: {}, Data: {}".format(
status_code, description, message.get_data()))

_handle_amqp_mgmt_error(_LOGGER, "Session lock renew failed.", condition, description, status_code)

def lock_renew_op(status_code, message, description):

def message_lock_renew_op(status_code, message, description):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
return message.get_data()
if status_code == 410:
raise MessageLockExpired(message=description)
raise ServiceBusError(
"Management request returned status code: {}. Description: {}, Data: {}".format(
status_code, description, message.get_data()))

_handle_amqp_mgmt_error(_LOGGER, "Message lock renew failed.", condition, description, status_code)


def peek_op(status_code, message, description, receiver):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
parsed = []
for m in message.get_data()[b'messages']:
Expand All @@ -37,24 +50,21 @@ def peek_op(status_code, message, description, receiver):
return parsed
if status_code in [202, 204]:
return []
error_msg = "Message peek failed with status code: {}.\n".format(status_code)
if description:
error_msg += "{}.".format(description)
raise ServiceBusError(error_msg)

_handle_amqp_mgmt_error(_LOGGER, "Message peek failed.", condition, description, status_code)


def list_sessions_op(status_code, message, description):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
parsed = []
for m in message.get_data()[b'sessions-ids']:
parsed.append(m.decode('UTF-8'))
return parsed
if status_code in [202, 204]:
return []
error_msg = "List sessions failed with status code: {}.\n".format(status_code)
if description:
error_msg += "{}.".format(description)
raise ServiceBusError(error_msg)

_handle_amqp_mgmt_error(_LOGGER, "List sessions failed.", condition, description, status_code)


def deferred_message_op(
Expand All @@ -65,6 +75,7 @@ def deferred_message_op(
receive_mode=ReceiveMode.PeekLock,
message_type=ServiceBusReceivedMessage
):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
parsed = []
for m in message.get_data()[b'messages']:
Expand All @@ -73,16 +84,13 @@ def deferred_message_op(
return parsed
if status_code in [202, 204]:
return []
error_msg = "Retrieving deferred messages failed with status code: {}.\n".format(status_code)
if description:
error_msg += "{}.".format(description)
raise ServiceBusError(error_msg)

_handle_amqp_mgmt_error(_LOGGER, "Retrieving deferred messages failed.", condition, description, status_code)


def schedule_op(status_code, message, description):
condition = message.application_properties.get(MGMT_RESPONSE_MESSAGE_ERROR_CONDITION)
if status_code == 200:
return message.get_data()[b'sequence-numbers']
error_msg = "Scheduling messages failed with status code: {}.\n".format(status_code)
if description:
error_msg += "{}.".format(description)
raise ServiceBusError(error_msg)

_handle_amqp_mgmt_error(_LOGGER, "Scheduling messages failed.", condition, description, status_code)
Loading