diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index c91e84f27f48..63b56ca69f1f 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -6,9 +6,9 @@ * Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`, `message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information. - -* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`, +* Added new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`, `dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information. +* Added support for sending received messages via `ServiceBusSender.send_messages`. **Breaking Changes** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 18c0e6bc7c18..3ac801f1a606 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -53,7 +53,7 @@ SessionLockExpired, MessageSettleFailed, MessageContentTooLarge) -from .utils import utc_from_timestamp, utc_now +from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed if TYPE_CHECKING: from .._servicebus_receiver import ServiceBusReceiver from .._servicebus_session_receiver import ServiceBusSessionReceiver @@ -540,6 +540,7 @@ def add(self, message): :rtype: None :raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit. """ + message = copy_messages_to_sendable_if_needed(message) message_size = message.message.get_message_encoded_size() # For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that @@ -575,6 +576,35 @@ class PeekMessage(Message): def __init__(self, message): super(PeekMessage, self).__init__(None, message=message) + def _to_outgoing_message(self): + # type: () -> Message + amqp_message = self.message + amqp_body = amqp_message._body # pylint: disable=protected-access + + if isinstance(amqp_body, uamqp.message.DataBody): + body = b''.join(amqp_body.data) + else: + # amqp_body is type of uamqp.message.ValueBody + body = amqp_body.data + + return Message( + body=body, + content_type=self.content_type, + correlation_id=self.correlation_id, + label=self.label, + message_id=self.message_id, + partition_key=self.partition_key, + properties=self.properties, + reply_to=self.reply_to, + reply_to_session_id=self.reply_to_session_id, + session_id=self.session_id, + scheduled_enqueue_time_utc=self.scheduled_enqueue_time_utc, + time_to_live=self.time_to_live, + to=self.to, + via_partition_key=self.via_partition_key + ) + + @property def dead_letter_error_description(self): # type: () -> Optional[str] diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index b5ba7f35ef16..a992d841051f 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -177,3 +177,23 @@ def generate_dead_letter_entity_name( ) return entity_name + + +def copy_messages_to_sendable_if_needed(messages): + """ + This method is to convert single/multiple received messages to sendable messages to enable message resending. + """ + # pylint: disable=protected-access + try: + msgs_to_return = [] + for each in messages: + try: + msgs_to_return.append(each._to_outgoing_message()) + except AttributeError: + msgs_to_return.append(each) + return msgs_to_return + except TypeError: + try: + return messages._to_outgoing_message() + except AttributeError: + return messages diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 4a10d18d74db..841b4eec7a4c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -18,7 +18,7 @@ OperationTimeoutError, _ServiceBusErrorPolicy, ) -from ._common.utils import create_authentication +from ._common.utils import create_authentication, copy_messages_to_sendable_if_needed from ._common.constants import ( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, @@ -68,6 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages): if not isinstance(message, Message): raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." " Received instead: {}".format(message.__class__.__name__)) + message = copy_messages_to_sendable_if_needed(message) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id @@ -326,6 +327,7 @@ def send_messages(self, message): :caption: Send message. """ + message = copy_messages_to_sendable_if_needed(message) try: batch = self.create_batch() batch._from_list(message) # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 0ad3a4f7c558..a5176d23d7f6 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -19,6 +19,7 @@ MGMT_REQUEST_SEQUENCE_NUMBERS ) from .._common import mgmt_handlers +from .._common.utils import copy_messages_to_sendable_if_needed from ._async_utils import create_authentication if TYPE_CHECKING: @@ -267,6 +268,7 @@ async def send_messages(self, message): :caption: Send message. """ + message = copy_messages_to_sendable_if_needed(message) try: batch = await self.create_batch() batch._from_list(message) # pylint: disable=protected-access diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 9ea1051a1003..00b26c1f7485 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -994,20 +994,27 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace servicebus_namespace_connection_string, logging_enable=False) as sb_client: enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) messages = [] - async with sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) as receiver: - async with sb_client.get_queue_sender(servicebus_queue.name) as sender: - content = str(uuid.uuid4()) - message_id_a = uuid.uuid4() - message_a = Message(content) - message_a.message_id = message_id_a - message_id_b = uuid.uuid4() - message_b = Message(content) - message_b.message_id = message_id_b - tokens = await sender.schedule_messages([message_a, message_b], enqueue_time) - assert len(tokens) == 2 + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) + sender = sb_client.get_queue_sender(servicebus_queue.name) + async with sender, receiver: + content = str(uuid.uuid4()) + message_id_a = uuid.uuid4() + message_a = Message(content) + message_a.message_id = message_id_a + message_id_b = uuid.uuid4() + message_b = Message(content) + message_b.message_id = message_id_b - recv = await receiver.receive_messages(max_wait_time=120) - messages.extend(recv) + await sender.send_messages([message_a, message_b]) + + received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5) + for message in received_messages: + await message.complete() + + tokens = await sender.schedule_messages(received_messages, enqueue_time) + assert len(tokens) == 2 + + messages = await receiver.receive_messages(max_wait_time=120) recv = await receiver.receive_messages(max_wait_time=5) messages.extend(recv) if messages: @@ -1229,13 +1236,34 @@ def message_content(): for i in range(20): yield Message("Message no. {}".format(i)) - async with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + async with sender, receiver: message = BatchMessage() for each in message_content(): message.add(each) await sender.send_messages(message) - async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: + receive_counter = 0 + message_received_cnt = 0 + while message_received_cnt < 20: + messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5) + if not messages: + break + receive_counter += 1 + message_received_cnt += len(messages) + for m in messages: + print_message(_logger, m) + await sender.send_messages(message) + await m.complete() + + assert message_received_cnt == 20 + # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration + assert receive_counter < 10 # Dynamic link credit issuing come info effect + + # received resent messages + receive_counter = 0 message_received_cnt = 0 while message_received_cnt < 20: diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index e90e00f0e313..e19a50d72822 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -745,23 +745,65 @@ def test_queue_by_servicebus_client_browse_messages_with_receiver(self, serviceb with ServiceBusClient.from_connection_string( servicebus_namespace_connection_string, logging_enable=False) as sb_client: - - with sb_client.get_queue_receiver(servicebus_queue.name, - idle_timeout=5, - mode=ReceiveSettleMode.PeekLock) as receiver: - with sb_client.get_queue_sender(servicebus_queue.name) as sender: - for i in range(5): - message = Message("Test message no. {}".format(i)) - sender.send_messages(message) + + receiver = sb_client.get_queue_receiver(servicebus_queue.name, + idle_timeout=5, + mode=ReceiveSettleMode.PeekLock) + sender = sb_client.get_queue_sender(servicebus_queue.name) + with receiver, sender: + for i in range(5): + message = Message( + body="Test message", + properties={'key': 'value'}, + label='label', + content_type='application/text', + correlation_id='cid', + message_id='mid', + partition_key='pk', + via_partition_key='via_pk', + to='to', + reply_to='reply_to', + time_to_live=timedelta(seconds=60) + ) + sender.send_messages(message) messages = receiver.peek_messages(5) assert len(messages) > 0 assert all(isinstance(m, PeekMessage) for m in messages) for message in messages: print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) with pytest.raises(AttributeError): message.complete() - + + sender.send_messages(message) + + cnt = 0 + for message in receiver: + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() + cnt += 1 + assert cnt == 10 @pytest.mark.liveTest @pytest.mark.live_test_only @@ -1193,28 +1235,36 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ servicebus_namespace_connection_string, logging_enable=False) as sb_client: enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0) - with sb_client.get_queue_receiver(servicebus_queue.name, - prefetch=20) as receiver: - with sb_client.get_queue_sender(servicebus_queue.name) as sender: - content = str(uuid.uuid4()) - message_id_a = uuid.uuid4() - message_a = Message(content) - message_a.message_id = message_id_a - message_id_b = uuid.uuid4() - message_b = Message(content) - message_b.message_id = message_id_b - message_arry = [message_a, message_b] - for message in message_arry: - message.properties = {'key': 'value'} - message.label = 'label' - message.content_type = 'application/text' - message.correlation_id = 'cid' - message.partition_key = 'pk' - message.via_partition_key = 'via_pk' - message.to = 'to' - message.reply_to = 'reply_to' - tokens = sender.schedule_messages(message_arry, enqueue_time) - assert len(tokens) == 2 + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) + + with sender, receiver: + content = str(uuid.uuid4()) + message_id_a = uuid.uuid4() + message_a = Message(content) + message_a.message_id = message_id_a + message_id_b = uuid.uuid4() + message_b = Message(content) + message_b.message_id = message_id_b + message_arry = [message_a, message_b] + for message in message_arry: + message.properties = {'key': 'value'} + message.label = 'label' + message.content_type = 'application/text' + message.correlation_id = 'cid' + message.partition_key = 'pk' + message.via_partition_key = 'via_pk' + message.to = 'to' + message.reply_to = 'reply_to' + + sender.send_messages(message_arry) + + received_messages = receiver.receive_messages(max_batch_size=2, max_wait_time=5) + for message in received_messages: + message.complete() + + tokens = sender.schedule_messages(received_messages, enqueue_time) + assert len(tokens) == 2 messages = receiver.receive_messages(max_wait_time=120) messages.extend(receiver.receive_messages(max_wait_time=5)) @@ -1540,15 +1590,29 @@ def test_queue_receive_batch_without_setting_prefetch(self, servicebus_namespace def message_content(): for i in range(20): - yield Message("Message no. {}".format(i)) - - with sb_client.get_queue_sender(servicebus_queue.name) as sender: + yield Message( + body="Test message", + properties={'key': 'value'}, + label='label', + content_type='application/text', + correlation_id='cid', + message_id='mid', + partition_key='pk', + via_partition_key='via_pk', + to='to', + reply_to='reply_to', + time_to_live=timedelta(seconds=60) + ) + + sender = sb_client.get_queue_sender(servicebus_queue.name) + receiver = sb_client.get_queue_receiver(servicebus_queue.name) + + with sender, receiver: message = BatchMessage() for each in message_content(): message.add(each) sender.send_messages(message) - with sb_client.get_queue_receiver(servicebus_queue.name) as receiver: receive_counter = 0 message_received_cnt = 0 while message_received_cnt < 20: @@ -1557,9 +1621,48 @@ def message_content(): break receive_counter += 1 message_received_cnt += len(messages) - for m in messages: - print_message(_logger, m) - m.complete() + for message in messages: + print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() + sender.send_messages(message) # resending received message + + assert message_received_cnt == 20 + # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration + assert receive_counter < 10 # Dynamic link credit issuing come info effect + + receive_counter = 0 + message_received_cnt = 0 + while message_received_cnt < 20: + messages = receiver.receive_messages(max_batch_size=20, max_wait_time=5) + if not messages: + break + receive_counter += 1 + message_received_cnt += len(messages) + for message in messages: + print_message(_logger, message) + assert b''.join(message.body) == b'Test message' + assert message.properties[b'key'] == b'value' + assert message.label == 'label' + assert message.content_type == 'application/text' + assert message.correlation_id == 'cid' + assert message.message_id == 'mid' + assert message.partition_key == 'pk' + assert message.via_partition_key == 'via_pk' + assert message.to == 'to' + assert message.reply_to == 'reply_to' + assert message.time_to_live == timedelta(seconds=60) + message.complete() assert message_received_cnt == 20 # Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index 56037252ccb9..b8985b991738 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -51,7 +51,10 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi servicebus_namespace_connection_string, logging_enable=False) as sb_client: session_id = str(uuid.uuid4()) - with sb_client.get_queue_sender(servicebus_queue.name) as sender: + sender = sb_client.get_queue_sender(servicebus_queue.name) + session = sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5) + + with sender, session: for i in range(3): message = Message("Handler message no. {}".format(i)) message.session_id = session_id @@ -67,11 +70,11 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi message.reply_to_session_id = 'reply_to_session_id' sender.send_messages(message) - with pytest.raises(ServiceBusConnectionError): - session = sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5)._open_with_retry() + with pytest.raises(ServiceBusConnectionError): + session = sb_client.get_queue_receiver(servicebus_queue.name, idle_timeout=5)._open_with_retry() - with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5) as session: count = 0 + received_cnt_dic = {} for message in session: print_message(_logger, message) assert message.delivery_count == 0 @@ -80,7 +83,6 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi assert message.label == 'label' assert message.content_type == 'application/text' assert message.correlation_id == 'cid' - assert message.message_id == str(count) assert message.partition_key == 'pk' assert message.via_partition_key == 'via_pk' assert message.to == 'to' @@ -91,8 +93,14 @@ def test_session_by_session_client_conn_str_receive_handler_peeklock(self, servi assert message.reply_to_session_id == 'reply_to_session_id' count += 1 message.complete() + if message.message_id not in received_cnt_dic: + received_cnt_dic[message.message_id] = 1 + sender.send_messages(message) + else: + received_cnt_dic[message.message_id] += 1 - assert count == 3 + assert received_cnt_dic['0'] == 2 and received_cnt_dic['1'] == 2 and received_cnt_dic['2'] == 2 + assert count == 6 @pytest.mark.liveTest @pytest.mark.live_test_only