Skip to content

Commit

Permalink
[ServiceBus] Non-reusable Sender/Receiver (#15172)
Browse files Browse the repository at this point in the history
* make handler non-reusable

* fix pylint
  • Loading branch information
yunhaoling authored Nov 10, 2020
1 parent 18df666 commit e3ce017
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 63 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## 7.0.0b9 (Unreleased)

**Breaking Changes**
* `ServiceBusSender` and `ServiceBusReceiver` are no more reusable and will raise `ValueError` when trying to operate on a closed handler.

## 7.0.0b8 (2020-11-05)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import uuid
import time
import threading
from datetime import timedelta
from typing import cast, Optional, Tuple, TYPE_CHECKING, Dict, Any, Callable

Expand Down Expand Up @@ -175,6 +176,7 @@ def __init__(
self._handler = None # type: uamqp.AMQPClient
self._auth_uri = None
self._properties = create_properties(self._config.user_agent)
self._shutdown = threading.Event()

@classmethod
def _convert_connection_string_to_kwargs(cls, conn_str, **kwargs):
Expand Down Expand Up @@ -232,6 +234,10 @@ def _handle_exception(self, exception, **kwargs):
def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")

require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand Down Expand Up @@ -383,3 +389,4 @@ def close(self):
:rtype: None
"""
self._close_handler()
self._shutdown.set()
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ def _build_message(self, received, message_type=ServiceBusReceivedMessage):
def _check_live(self):
"""check whether the receiver is alive"""
# pylint: disable=protected-access
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")
if self._session and self._session._lock_expired: # pylint: disable=protected-access
raise SessionLockExpired(error=self._session.auto_renew_error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ def schedule_messages(self, messages, schedule_time_utc, **kwargs):
:caption: Schedule a message to be sent in future
"""
# pylint: disable=protected-access
self._open()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down Expand Up @@ -290,7 +289,6 @@ def cancel_scheduled_messages(self, sequence_numbers, **kwargs):
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
self._open()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def get_token(self, *scopes: str, **kwargs: Any) -> AccessToken: # pylint
return _generate_sas_token(scopes[0], self.policy, self.key)


class BaseHandler:
class BaseHandler: # pylint:disable=too-many-instance-attributes
def __init__(
self,
fully_qualified_namespace: str,
Expand All @@ -94,6 +94,7 @@ def __init__(
self._handler = None # type: uamqp.AMQPClientAsync
self._auth_uri = None
self._properties = create_properties(self._config.user_agent)
self._shutdown = asyncio.Event()

@classmethod
def _convert_connection_string_to_kwargs(cls, conn_str, **kwargs):
Expand Down Expand Up @@ -126,6 +127,10 @@ async def _handle_exception(self, exception, **kwargs):
async def _do_retryable_operation(self, operation, timeout=None, **kwargs):
# type: (Callable, Optional[float], Any) -> Any
# pylint: disable=protected-access
if self._shutdown.is_set():
raise ValueError("The handler has already been shutdown. Please use ServiceBusClient to "
"create a new instance.")

require_last_exception = kwargs.pop("require_last_exception", False)
operation_requires_timeout = kwargs.pop("operation_requires_timeout", False)
retried_times = 0
Expand Down Expand Up @@ -273,3 +278,4 @@ async def close(self) -> None:
:rtype: None
"""
await self._close_handler()
self._shutdown.set()
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ async def schedule_messages(
:caption: Schedule a message to be sent in future
"""
# pylint: disable=protected-access
await self._open()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down Expand Up @@ -234,7 +233,6 @@ async def cancel_scheduled_messages(self, sequence_numbers: Union[int, List[int]
:dedent: 4
:caption: Cancelling messages scheduled to be sent in future
"""
await self._open()
timeout = kwargs.pop("timeout", None)
if timeout is not None and timeout <= 0:
raise ValueError("The timeout must be greater than 0.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,30 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender = sb_client.get_queue_sender(servicebus_queue.name)
async with sender:
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
await sender.send_messages(message, timeout=5)

with pytest.raises(ValueError):
async with sender:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
await sender.send_messages(ServiceBusMessage('msg'))
with pytest.raises(ValueError):
await sender.schedule_messages(ServiceBusMessage('msg'), utc_now())
with pytest.raises(ValueError):
await sender.cancel_scheduled_messages([1, 2, 3])

with pytest.raises(ServiceBusConnectionError):
await (sb_client.get_queue_receiver(servicebus_queue.name, session_id="test", max_wait_time=5))._open_with_retry()

with pytest.raises(ValueError):
sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=0)

async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:

receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
async with receiver:
with pytest.raises(ValueError):
await receiver.receive_messages(max_wait_time=0)

Expand All @@ -88,6 +99,16 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel

assert count == 10

with pytest.raises(ValueError):
await receiver.receive_messages()
with pytest.raises(ValueError):
async with receiver:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
await receiver.receive_deferred_messages([1, 2, 3])
with pytest.raises(ValueError):
await receiver.peek_messages()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand All @@ -96,22 +117,45 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_peeklock(sel
async def test_async_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
async with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
messages = []
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
messages.append(message)
await sender.send_messages(messages)
assert sender._handler._msg_timeout == 0
sender = sb_client.get_queue_sender(servicebus_queue.name)
messages = []
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
messages.append(message)
await sender.send_messages(messages)
assert sender._handler._msg_timeout == 0
await sender.close()

async with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:
count = 0
async for message in receiver:
print_message(_logger, message)
count += 1
await receiver.complete_message(message)
with pytest.raises(ValueError):
async with sender:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
await sender.send_messages(ServiceBusMessage('msg'))
with pytest.raises(ValueError):
await sender.schedule_messages(ServiceBusMessage('msg'), utc_now())
with pytest.raises(ValueError):
await sender.cancel_scheduled_messages([1, 2, 3])

assert count == 10
receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
count = 0
async for message in receiver:
print_message(_logger, message)
count += 1
await receiver.complete_message(message)

assert count == 10

await receiver.close()

with pytest.raises(ValueError):
await receiver.receive_messages()
with pytest.raises(ValueError):
async with receiver:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
await receiver.receive_deferred_messages([1, 2, 3])
with pytest.raises(ValueError):
await receiver.peek_messages()

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down Expand Up @@ -214,7 +258,6 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_with_stop(se
assert receiver._running
assert len(messages) == 5

async with receiver:
async for message in receiver:
messages.append(message)
await receiver.complete_message(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ async def test_async_session_by_session_client_conn_str_receive_handler_with_sto
assert receiver._running
assert len(messages) == 5

async with receiver:
async for message in receiver:
assert session_id == receiver.session.session_id
assert session_id == message.session_id
Expand Down
95 changes: 67 additions & 28 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def test_receive_and_delete_reconnect_interaction(self, servicebus_namespace_con
for i in range(5):
sender.send_messages(ServiceBusMessage("ServiceBusMessage {}".format(i)))

with sb_client.get_queue_receiver(servicebus_queue.name,
receive_mode=ReceiveMode.ReceiveAndDelete,
with sb_client.get_queue_receiver(servicebus_queue.name,
receive_mode=ReceiveMode.ReceiveAndDelete,
max_wait_time=10) as receiver:
batch = receiver.receive_messages()
count = len(batch)
Expand Down Expand Up @@ -107,7 +107,6 @@ def test_github_issue_6178(self, servicebus_namespace_connection_string, service
receiver.complete_message(message)
time.sleep(40)


@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand All @@ -117,18 +116,29 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
message.application_properties = {'key': 'value'}
message.subject = 'label'
message.content_type = 'application/text'
message.correlation_id = 'cid'
message.message_id = str(i)
message.partition_key = 'pk'
message.to = 'to'
message.reply_to = 'reply_to'
sender.send_messages(message)
sender = sb_client.get_queue_sender(servicebus_queue.name)
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
message.application_properties = {'key': 'value'}
message.subject = 'label'
message.content_type = 'application/text'
message.correlation_id = 'cid'
message.message_id = str(i)
message.partition_key = 'pk'
message.to = 'to'
message.reply_to = 'reply_to'
sender.send_messages(message)
sender.close()

with pytest.raises(ValueError):
with sender:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
sender.send_messages(ServiceBusMessage('msg'))
with pytest.raises(ValueError):
sender.schedule_messages(ServiceBusMessage('msg'), utc_now())
with pytest.raises(ValueError):
sender.cancel_scheduled_messages([1, 2, 3])

with pytest.raises(ValueError):
sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=0)
Expand Down Expand Up @@ -167,6 +177,16 @@ def test_queue_by_queue_client_conn_str_receive_handler_peeklock(self, servicebu
receiver.complete_message(message)
receiver.close()

with pytest.raises(ValueError):
receiver.receive_messages()
with pytest.raises(ValueError):
with receiver:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
receiver.receive_deferred_messages([1, 2, 3])
with pytest.raises(ValueError):
receiver.peek_messages()

assert count == 10

@pytest.mark.liveTest
Expand All @@ -178,7 +198,8 @@ def test_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:

with sb_client.get_queue_sender(servicebus_queue.name) as sender:
sender = sb_client.get_queue_sender(servicebus_queue.name)
with sender:
messages = []
for i in range(10):
message = ServiceBusMessage("Handler message no. {}".format(i))
Expand All @@ -192,7 +213,18 @@ def test_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace
messages.append(message)
sender.send_messages(messages)

with sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5) as receiver:
with pytest.raises(ValueError):
with sender:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
sender.send_messages(ServiceBusMessage('msg'))
with pytest.raises(ValueError):
sender.schedule_messages(ServiceBusMessage('msg'), utc_now())
with pytest.raises(ValueError):
sender.cancel_scheduled_messages([1, 2, 3])

receiver = sb_client.get_queue_receiver(servicebus_queue.name, max_wait_time=5)
with receiver:
count = 0
for message in receiver:
print_message(_logger, message)
Expand All @@ -210,9 +242,18 @@ def test_queue_by_queue_client_send_multiple_messages(self, servicebus_namespace
assert not message.reply_to_session_id
count += 1
receiver.complete_message(message)

assert count == 10

with pytest.raises(ValueError):
receiver.receive_messages()
with pytest.raises(ValueError):
with receiver:
raise AssertionError("Should raise ValueError")
with pytest.raises(ValueError):
receiver.receive_deferred_messages([1, 2, 3])
with pytest.raises(ValueError):
receiver.peek_messages()

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedResourceGroupPreparer(name_prefix='servicebustest')
Expand Down Expand Up @@ -291,16 +332,14 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_stop(self, serviceb
assert receiver._running
assert len(messages) == 5

with receiver:
for message in receiver:
messages.append(message)
receiver.complete_message(message)
if len(messages) >= 5:
break

assert not receiver._running
assert len(messages) == 6

for message in receiver:
messages.append(message)
receiver.complete_message(message)
if len(messages) >= 5:
break

assert not receiver._running
assert len(messages) == 6

@pytest.mark.liveTest
@pytest.mark.live_test_only
Expand Down
Loading

0 comments on commit e3ce017

Please sign in to comment.