Skip to content

Commit

Permalink
[ServiceBus] Resend received message (Azure#12457)
Browse files Browse the repository at this point in the history
* resend received message

* update implementation and add test

* add schedule resend test and fix pylint

* async test

* update implementation to be more pythonic

* remove circular import

* update according to review

* remove unused import
  • Loading branch information
yunhaoling authored Jul 23, 2020
1 parent c6388a0 commit 92d9f42
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 63 deletions.
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

* Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`,
`message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information.

* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
* Added new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
`dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information.
* Added support for sending received messages via `ServiceBusSender.send_messages`.

**Breaking Changes**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
SessionLockExpired,
MessageSettleFailed,
MessageContentTooLarge)
from .utils import utc_from_timestamp, utc_now
from .utils import utc_from_timestamp, utc_now, copy_messages_to_sendable_if_needed
if TYPE_CHECKING:
from .._servicebus_receiver import ServiceBusReceiver
from .._servicebus_session_receiver import ServiceBusSessionReceiver
Expand Down Expand Up @@ -540,6 +540,7 @@ def add(self, message):
:rtype: None
:raises: :class: ~azure.servicebus.exceptions.MessageContentTooLarge, when exceeding the size limit.
"""
message = copy_messages_to_sendable_if_needed(message)
message_size = message.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand Down Expand Up @@ -575,6 +576,35 @@ class PeekMessage(Message):
def __init__(self, message):
super(PeekMessage, self).__init__(None, message=message)

def _to_outgoing_message(self):
# type: () -> Message
amqp_message = self.message
amqp_body = amqp_message._body # pylint: disable=protected-access

if isinstance(amqp_body, uamqp.message.DataBody):
body = b''.join(amqp_body.data)
else:
# amqp_body is type of uamqp.message.ValueBody
body = amqp_body.data

return Message(
body=body,
content_type=self.content_type,
correlation_id=self.correlation_id,
label=self.label,
message_id=self.message_id,
partition_key=self.partition_key,
properties=self.properties,
reply_to=self.reply_to,
reply_to_session_id=self.reply_to_session_id,
session_id=self.session_id,
scheduled_enqueue_time_utc=self.scheduled_enqueue_time_utc,
time_to_live=self.time_to_live,
to=self.to,
via_partition_key=self.via_partition_key
)


@property
def dead_letter_error_description(self):
# type: () -> Optional[str]
Expand Down
20 changes: 20 additions & 0 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,23 @@ def generate_dead_letter_entity_name(
)

return entity_name


def copy_messages_to_sendable_if_needed(messages):
"""
This method is to convert single/multiple received messages to sendable messages to enable message resending.
"""
# pylint: disable=protected-access
try:
msgs_to_return = []
for each in messages:
try:
msgs_to_return.append(each._to_outgoing_message())
except AttributeError:
msgs_to_return.append(each)
return msgs_to_return
except TypeError:
try:
return messages._to_outgoing_message()
except AttributeError:
return messages
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
OperationTimeoutError,
_ServiceBusErrorPolicy,
)
from ._common.utils import create_authentication
from ._common.utils import create_authentication, copy_messages_to_sendable_if_needed
from ._common.constants import (
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
Expand Down Expand Up @@ -68,6 +68,7 @@ def _build_schedule_request(cls, schedule_time_utc, *messages):
if not isinstance(message, Message):
raise ValueError("Scheduling batch messages only supports iterables containing Message Objects."
" Received instead: {}".format(message.__class__.__name__))
message = copy_messages_to_sendable_if_needed(message)
message.scheduled_enqueue_time_utc = schedule_time_utc
message_data = {}
message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id
Expand Down Expand Up @@ -326,6 +327,7 @@ def send_messages(self, message):
:caption: Send message.
"""
message = copy_messages_to_sendable_if_needed(message)
try:
batch = self.create_batch()
batch._from_list(message) # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
MGMT_REQUEST_SEQUENCE_NUMBERS
)
from .._common import mgmt_handlers
from .._common.utils import copy_messages_to_sendable_if_needed
from ._async_utils import create_authentication

if TYPE_CHECKING:
Expand Down Expand Up @@ -267,6 +268,7 @@ async def send_messages(self, message):
:caption: Send message.
"""
message = copy_messages_to_sendable_if_needed(message)
try:
batch = await self.create_batch()
batch._from_list(message) # pylint: disable=protected-access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,20 +994,27 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
enqueue_time = (utc_now() + timedelta(minutes=2)).replace(microsecond=0)
messages = []
async with sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20) as receiver:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b
tokens = await sender.schedule_messages([message_a, message_b], enqueue_time)
assert len(tokens) == 2
receiver = sb_client.get_queue_receiver(servicebus_queue.name, prefetch=20)
sender = sb_client.get_queue_sender(servicebus_queue.name)
async with sender, receiver:
content = str(uuid.uuid4())
message_id_a = uuid.uuid4()
message_a = Message(content)
message_a.message_id = message_id_a
message_id_b = uuid.uuid4()
message_b = Message(content)
message_b.message_id = message_id_b

recv = await receiver.receive_messages(max_wait_time=120)
messages.extend(recv)
await sender.send_messages([message_a, message_b])

received_messages = await receiver.receive_messages(max_batch_size=2, max_wait_time=5)
for message in received_messages:
await message.complete()

tokens = await sender.schedule_messages(received_messages, enqueue_time)
assert len(tokens) == 2

messages = await receiver.receive_messages(max_wait_time=120)
recv = await receiver.receive_messages(max_wait_time=5)
messages.extend(recv)
if messages:
Expand Down Expand Up @@ -1229,13 +1236,34 @@ def message_content():
for i in range(20):
yield Message("Message no. {}".format(i))

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender = sb_client.get_queue_sender(servicebus_queue.name)
receiver = sb_client.get_queue_receiver(servicebus_queue.name)

async with sender, receiver:
message = BatchMessage()
for each in message_content():
message.add(each)
await sender.send_messages(message)

async with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
messages = await receiver.receive_messages(max_batch_size=20, max_wait_time=5)
if not messages:
break
receive_counter += 1
message_received_cnt += len(messages)
for m in messages:
print_message(_logger, m)
await sender.send_messages(message)
await m.complete()

assert message_received_cnt == 20
# Network/server might be unstable making flow control ineffective in the leading rounds of connection iteration
assert receive_counter < 10 # Dynamic link credit issuing come info effect

# received resent messages

receive_counter = 0
message_received_cnt = 0
while message_received_cnt < 20:
Expand Down
Loading

0 comments on commit 92d9f42

Please sign in to comment.