diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index f424295345a0..c91e84f27f48 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -25,6 +25,12 @@ - Removed property `settled` on `PeekMessage`. - Removed property `expired` on `ReceivedMessage`. +* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion) + +**Breaking Changes** + +* `AutoLockRenew.sleep_time` and `AutoLockRenew.renew_period` have been made internal as `_sleep_time` and `_renew_period` respectively, as it is not expected a user will have to interact with them. +* `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors. ## 7.0.0b4 (2020-07-06) @@ -35,8 +41,8 @@ **BugFixes** -* Fixed bug where sync AutoLockRenew does not shutdown itself timely. -* Fixed bug where async AutoLockRenew does not support context manager. +* Fixed bug where sync `AutoLockRenew` does not shutdown itself timely. +* Fixed bug where async `AutoLockRenew` does not support context manager. **Breaking Changes** diff --git a/sdk/servicebus/azure-servicebus/README.md b/sdk/servicebus/azure-servicebus/README.md index 21ba5350bf5a..4e61997e8ef8 100644 --- a/sdk/servicebus/azure-servicebus/README.md +++ b/sdk/servicebus/azure-servicebus/README.md @@ -381,7 +381,7 @@ connstr = os.environ['SERVICE_BUS_CONN_STR'] queue_name = os.environ['SERVICE_BUS_QUEUE_NAME'] session_id = os.environ['SERVICE_BUS_SESSION_ID'] -# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown. +# Can also be called via "with AutoLockRenew() as renewer" to automate closing. renewer = AutoLockRenew() with ServiceBusClient.from_connection_string(connstr) as client: with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver: @@ -390,7 +390,7 @@ with ServiceBusClient.from_connection_string(connstr) as client: renewer.register(msg, timeout=60) # Do your application logic here msg.complete() -renewer.shutdown() +renewer.close() ``` If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py index 10e5205fd439..826e54f3e087 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py @@ -16,7 +16,7 @@ from ._base_handler import ServiceBusSharedKeyCredential from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE -from ._common.utils import AutoLockRenew +from ._common.auto_lock_renewer import AutoLockRenew TransportType = constants.TransportType diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py new file mode 100644 index 000000000000..cb9a939f6c01 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -0,0 +1,136 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import datetime +import logging +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING + +from .._servicebus_session import ServiceBusSession +from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError +from .utils import renewable_start_time, utc_now + +if TYPE_CHECKING: + from typing import Callable, Union, Optional, Awaitable + from .message import ReceivedMessage + LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], None] + +_log = logging.getLogger(__name__) + +class AutoLockRenew(object): + """Auto renew locks for messages and sessions using a background thread pool. + + :param executor: A user-specified thread pool. This cannot be combined with + setting `max_workers`. + :type executor: ~concurrent.futures.ThreadPoolExecutor + :param max_workers: Specify the maximum workers in the thread pool. If not + specified the number used will be derived from the core count of the environment. + This cannot be combined with `executor`. + :type max_workers: int + + .. admonition:: Example: + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START auto_lock_renew_message_sync] + :end-before: [END auto_lock_renew_message_sync] + :language: python + :dedent: 4 + :caption: Automatically renew a message lock + + .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py + :start-after: [START auto_lock_renew_session_sync] + :end-before: [END auto_lock_renew_session_sync] + :language: python + :dedent: 4 + :caption: Automatically renew a session lock + + """ + + def __init__(self, executor=None, max_workers=None): + self._executor = executor or ThreadPoolExecutor(max_workers=max_workers) + self._shutdown = threading.Event() + self._sleep_time = 1 + self._renew_period = 10 + + def __enter__(self): + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + return self + + def __exit__(self, *args): + self.close() + + def _renewable(self, renewable): + # pylint: disable=protected-access + if self._shutdown.is_set(): + return False + if hasattr(renewable, '_settled') and renewable._settled: + return False + if not renewable._receiver._running: + return False + if renewable._lock_expired: + return False + return True + + def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + # pylint: disable=protected-access + _log.debug("Running lock auto-renew thread for %r seconds", timeout) + error = None + clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) + try: + while self._renewable(renewable): + if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): + _log.debug("Reached auto lock renew timeout - letting lock expire.") + raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period) + renewable.renew_lock() + time.sleep(self._sleep_time) + clean_shutdown = not renewable._lock_expired + except AutoLockRenewTimeout as e: + error = e + renewable.auto_renew_error = e + clean_shutdown = not renewable._lock_expired + except Exception as e: # pylint: disable=broad-except + _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) + error = AutoLockRenewFailed( + "Failed to auto-renew lock", + inner_exception=e) + renewable.auto_renew_error = error + finally: + if on_lock_renew_failure and not clean_shutdown: + on_lock_renew_failure(renewable, error) + + def register(self, renewable, timeout=300, on_lock_renew_failure=None): + """Register a renewable entity for automatic lock renewal. + + :param renewable: A locked entity that needs to be renewed. + :type renewable: ~azure.servicebus.ReceivedMessage or + ~azure.servicebus.ServiceBusSession + :param float timeout: A time in seconds that the lock should be maintained for. + Default value is 300 (5 minutes). + :param Optional[LockRenewFailureCallback] on_lock_renew_failure: + A callback may be specified to be called when the lock is lost on the renewable that is being registered. + Default value is None (no callback). + """ + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + starttime = renewable_start_time(renewable) + self._executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure) + + def close(self, wait=True): + """Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads. + + :param wait: Whether to block until thread pool has shutdown. Default is `True`. + :type wait: bool + """ + self._shutdown.set() + self._executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 38f6db331483..b5ba7f35ef16 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -7,8 +7,6 @@ import sys import datetime import logging -import threading -import time import functools import platform from typing import Optional, Dict @@ -16,11 +14,10 @@ from urlparse import urlparse except ImportError: from urllib.parse import urlparse -from concurrent.futures import ThreadPoolExecutor from uamqp import authentication, types -from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError +from ..exceptions import ServiceBusError from .._version import VERSION from .constants import ( JWT_TOKEN_SCOPE, @@ -180,101 +177,3 @@ def generate_dead_letter_entity_name( ) return entity_name - - -class AutoLockRenew(object): - """Auto renew locks for messages and sessions using a background thread pool. - - :param executor: A user-specified thread pool. This cannot be combined with - setting `max_workers`. - :type executor: ~concurrent.futures.ThreadPoolExecutor - :param max_workers: Specify the maximum workers in the thread pool. If not - specified the number used will be derived from the core count of the environment. - This cannot be combined with `executor`. - :type max_workers: int - - .. admonition:: Example: - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START auto_lock_renew_message_sync] - :end-before: [END auto_lock_renew_message_sync] - :language: python - :dedent: 4 - :caption: Automatically renew a message lock - - .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py - :start-after: [START auto_lock_renew_session_sync] - :end-before: [END auto_lock_renew_session_sync] - :language: python - :dedent: 4 - :caption: Automatically renew a session lock - - """ - - def __init__(self, executor=None, max_workers=None): - self.executor = executor or ThreadPoolExecutor(max_workers=max_workers) - self._shutdown = threading.Event() - self.sleep_time = 1 - self.renew_period = 10 - - def __enter__(self): - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - return self - - def __exit__(self, *args): - self.shutdown() - - def _renewable(self, renewable): - if self._shutdown.is_set(): - return False - if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access - return False - if renewable._lock_expired: # pylint: disable=protected-access - return False - return True - - def _auto_lock_renew(self, renewable, starttime, timeout): - _log.debug("Running lock auto-renew thread for %r seconds", timeout) - try: - while self._renewable(renewable): - if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): - _log.debug("Reached auto lock renew timeout - letting lock expire.") - raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) - renewable.renew_lock() - time.sleep(self.sleep_time) - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e - except Exception as e: # pylint: disable=broad-except - _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) - error = AutoLockRenewFailed( - "Failed to auto-renew lock", - inner_exception=e) - renewable.auto_renew_error = error - - def register(self, renewable, timeout=300): - """Register a renewable entity for automatic lock renewal. - - :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.ReceivedMessage or - ~azure.servicebus.Session - :param float timeout: A time in seconds that the lock should be maintained for. - Default value is 300 (5 minutes). - """ - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - starttime = renewable_start_time(renewable) - self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout) - - def shutdown(self, wait=True): - """Shutdown the thread pool to clean up any remaining lock renewal threads. - - :param wait: Whether to block until thread pool has shutdown. Default is `True`. - :type wait: bool - """ - self._shutdown.set() - self.executor.shutdown(wait=wait) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py index 7b24904344b2..8db9f20d7b1c 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/__init__.py @@ -10,7 +10,7 @@ from ._servicebus_session_receiver_async import ServiceBusSessionReceiver from ._servicebus_session_async import ServiceBusSession from ._servicebus_client_async import ServiceBusClient -from ._async_utils import AutoLockRenew +from ._async_auto_lock_renewer import AutoLockRenew __all__ = [ 'ReceivedMessage', diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py new file mode 100644 index 000000000000..be006a36f1c8 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -0,0 +1,138 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import asyncio +import logging +import datetime +from typing import Optional, Iterable, Any, Union, Callable, Awaitable, List + +from ._async_message import ReceivedMessage +from ._servicebus_session_async import ServiceBusSession +from .._common.utils import renewable_start_time, utc_now +from ._async_utils import get_running_loop +from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError + +AsyncLockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], Awaitable[None]] + +_log = logging.getLogger(__name__) + + +class AutoLockRenew: + """Auto lock renew. + + An asynchronous AutoLockRenew handler for renewing the lock + tokens of messages and/or sessions in the background. + + :param loop: An async event loop. + :type loop: ~asyncio.BaseEventLoop + + .. admonition:: Example: + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START auto_lock_renew_message_async] + :end-before: [END auto_lock_renew_message_async] + :language: python + :dedent: 4 + :caption: Automatically renew a message lock + + .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py + :start-after: [START auto_lock_renew_session_async] + :end-before: [END auto_lock_renew_session_async] + :language: python + :dedent: 4 + :caption: Automatically renew a session lock + + """ + + def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: + self._shutdown = asyncio.Event() + self._futures = [] # type: List[asyncio.Future] + self._loop = loop or get_running_loop() + self._sleep_time = 1 + self._renew_period = 10 + + async def __aenter__(self) -> "AutoLockRenew": + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + return self + + async def __aexit__(self, *args: Iterable[Any]) -> None: + await self.close() + + def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bool: + # pylint: disable=protected-access + if self._shutdown.is_set(): + return False + if hasattr(renewable, '_settled') and renewable._settled: # type: ignore + return False + if renewable._lock_expired: + return False + if not renewable._receiver._running: + return False + return True + + async def _auto_lock_renew(self, + renewable: Union[ReceivedMessage, ServiceBusSession], + starttime: datetime.datetime, + timeout: float, + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: + # pylint: disable=protected-access + _log.debug("Running async lock auto-renew for %r seconds", timeout) + error = None # type: Optional[Exception] + clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) + try: + while self._renewable(renewable): + if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): + _log.debug("Reached auto lock renew timeout - letting lock expire.") + raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) + if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self._renew_period): + _log.debug("%r seconds or less until lock expires - auto renewing.", self._renew_period) + await renewable.renew_lock() + await asyncio.sleep(self._sleep_time) + clean_shutdown = not renewable._lock_expired + except AutoLockRenewTimeout as e: + error = e + renewable.auto_renew_error = e + clean_shutdown = not renewable._lock_expired + except Exception as e: # pylint: disable=broad-except + _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) + error = AutoLockRenewFailed( + "Failed to auto-renew lock", + inner_exception=e) + renewable.auto_renew_error = error + finally: + if on_lock_renew_failure and not clean_shutdown: + await on_lock_renew_failure(renewable, error) + + def register(self, + renewable: Union[ReceivedMessage, ServiceBusSession], + timeout: float = 300, + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: + """Register a renewable entity for automatic lock renewal. + + :param renewable: A locked entity that needs to be renewed. + :type renewable: Union[~azure.servicebus.aio.ReceivedMessage,~azure.servicebus.aio.ServiceBusSession] + :param float timeout: A time in seconds that the lock should be maintained for. + Default value is 300 (5 minutes). + :param Optional[AsyncLockRenewFailureCallback] on_lock_renew_failure: + An async callback may be specified to be called when the lock is lost on the renewable being registered. + Default value is None (no callback). + """ + if self._shutdown.is_set(): + raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" + " auto lock renewing.") + starttime = renewable_start_time(renewable) + renew_future = asyncio.ensure_future( + self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), + loop=self._loop) + self._futures.append(renew_future) + + async def close(self) -> None: + """Cease autorenewal by cancelling any remaining open lock renewal futures.""" + self._shutdown.set() + await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index b963ad2394db..387c7e3afcad 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -11,8 +11,6 @@ from uamqp import authentication -from .._common.utils import renewable_start_time, utc_now -from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError from .._common.constants import ( JWT_TOKEN_SCOPE, TOKEN_TYPE_JWT, @@ -70,97 +68,3 @@ async def create_authentication(client): http_proxy=client._config.http_proxy, transport_type=client._config.transport_type, ) - - -class AutoLockRenew: - """Auto lock renew. - - An asynchronous AutoLockRenew handler for renewing the lock - tokens of messages and/or sessions in the background. - - :param loop: An async event loop. - :type loop: ~asyncio.EventLoop - - .. admonition:: Example: - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START auto_lock_renew_message_async] - :end-before: [END auto_lock_renew_message_async] - :language: python - :dedent: 4 - :caption: Automatically renew a message lock - - .. literalinclude:: ../samples/async_samples/sample_code_servicebus_async.py - :start-after: [START auto_lock_renew_session_async] - :end-before: [END auto_lock_renew_session_async] - :language: python - :dedent: 4 - :caption: Automatically renew a session lock - - """ - - def __init__(self, loop=None): - self._shutdown = asyncio.Event() - self._futures = [] - self.loop = loop or get_running_loop() - self.sleep_time = 1 - self.renew_period = 10 - - async def __aenter__(self): - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - return self - - async def __aexit__(self, *args): - await self.shutdown() - - def _renewable(self, renewable): - if self._shutdown.is_set(): - return False - if hasattr(renewable, '_settled') and renewable._settled: # pylint: disable=protected-access - return False - if renewable._lock_expired: # pylint: disable=protected-access - return False - return True - - async def _auto_lock_renew(self, renewable, starttime, timeout): - _log.debug("Running async lock auto-renew for %r seconds", timeout) - try: - while self._renewable(renewable): - if (utc_now() - starttime) >= datetime.timedelta(seconds=timeout): - _log.debug("Reached auto lock renew timeout - letting lock expire.") - raise AutoLockRenewTimeout("Auto-renew period ({} seconds) elapsed.".format(timeout)) - if (renewable.locked_until_utc - utc_now()) <= datetime.timedelta(seconds=self.renew_period): - _log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period) - await renewable.renew_lock() - await asyncio.sleep(self.sleep_time) - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e - except Exception as e: # pylint: disable=broad-except - _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) - error = AutoLockRenewFailed( - "Failed to auto-renew lock", - inner_exception=e) - renewable.auto_renew_error = error - - def register(self, renewable, timeout=300): - """Register a renewable entity for automatic lock renewal. - - :param renewable: A locked entity that needs to be renewed. - :type renewable: ~azure.servicebus.aio.ReceivedMessage or - ~azure.servicebus.aio.Session - :param float timeout: A time in seconds that the lock should be maintained for. - Default value is 300 (5 minutes). - """ - if self._shutdown.is_set(): - raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" - " auto lock renewing.") - starttime = renewable_start_time(renewable) - renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout), loop=self.loop) - self._futures.append(renew_future) - - async def shutdown(self): - """Cancel remaining open lock renewal futures.""" - self._shutdown.set() - await asyncio.wait(self._futures) diff --git a/sdk/servicebus/azure-servicebus/migration_guide.md b/sdk/servicebus/azure-servicebus/migration_guide.md index 7a5c71610366..98281a340add 100644 --- a/sdk/servicebus/azure-servicebus/migration_guide.md +++ b/sdk/servicebus/azure-servicebus/migration_guide.md @@ -77,6 +77,11 @@ semantics with the sender or receiver lifetime. | `azure.servicebus.control_client.ServiceBusService().create_queue(queue_name)` | `azure.servicebus.management.ServiceBusManagementClient().create_queue(queue_name)` | [Create a queue](./samples/sync_samples/mgmt_queue.py) | | `azure.servicebus.ServiceBusClient().list_queues()` | `azure.servicebus.management.ServiceBusManagementClient().list_queues()` | [List queues](./samples/sync_samples/mgmt_queue.py ) | +### Working with AutoLockRenew +| In v0.50 | Equivalent in v7 | Sample | +|---|---|---| +| `azure.servicebus.AutoLockRenew().shutdown()` | `azure.servicebus.AutoLockRenew().close()` | [Close an auto-lock-renewer](./samples/sync_samples/auto_lock_renew.py) | + ## Migration samples diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index 12dc083032cb..9f9b47b4b25d 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -51,6 +51,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [auto_lock_renew.py](./sync_samples/auto_lock_renew.py) ([async_version](./async_samples/auto_lock_renew_async.py)) - Examples to show usage of AutoLockRenew: - Automatically renew lock on message received from non-sessionful entity - Automatically renew lock on the session of sessionful entity + - Configure a callback to be triggered on auto lock renew failures. - [mgmt_queue](./sync_samples/mgmt_queue.py) ([async_version](./async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace - Create a queue - Delete a queue diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py index 05991ee0c4aa..917eec08f0b0 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py @@ -18,6 +18,7 @@ from azure.servicebus import Message from azure.servicebus.aio import ServiceBusClient, AutoLockRenew +from azure.servicebus.exceptions import MessageLockExpired CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -50,7 +51,7 @@ async def renew_lock_on_message_received_from_non_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.shutdown() + await renewer.close() async def renew_lock_on_session_of_the_sessionful_entity(): @@ -81,9 +82,50 @@ async def renew_lock_on_session_of_the_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.shutdown() + +async def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + async with servicebus_client: + async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + await sender.send_messages(Message("message")) + + async with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + async def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = await receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + await asyncio.sleep(80) + + try: + for msg in received_msgs: + await msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') + loop = asyncio.get_event_loop() loop.run_until_complete(renew_lock_on_message_received_from_non_sessionful_entity()) loop.run_until_complete(renew_lock_on_session_of_the_sessionful_entity()) +loop.run_until_complete(renew_lock_with_lock_renewal_failure_callback()) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py index 32f977a5e935..92268bbcf3f3 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py @@ -17,6 +17,7 @@ import time from azure.servicebus import ServiceBusClient, AutoLockRenew, Message +from azure.servicebus.exceptions import MessageLockExpired CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -49,7 +50,7 @@ def renew_lock_on_message_received_from_non_sessionful_entity(): msg.complete() # Settling the message deregisters it from the AutoLockRenewer print('Complete messages.') - renewer.shutdown() + renewer.close() def renew_lock_on_session_of_the_sessionful_entity(): @@ -82,8 +83,50 @@ def renew_lock_on_session_of_the_sessionful_entity(): print('Complete messages.') - renewer.shutdown() + renewer.close() + + +def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + with servicebus_client: + with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + sender.send_messages(Message("message")) + + with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + time.sleep(80) + + try: + for msg in received_msgs: + msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') renew_lock_on_message_received_from_non_sessionful_entity() renew_lock_on_session_of_the_sessionful_entity() +renew_lock_with_lock_renewal_failure_callback() \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py new file mode 100644 index 000000000000..fd3f70b06c9b --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/mocks_async.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +from azure.servicebus._common.utils import utc_now + +class MockReceiver: + def __init__(self): + self._running = True + +class MockReceivedMessage: + def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): + self._lock_duration = 2 + + self._received_timestamp_utc = utc_now() + self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration) + self._settled = False + self._receiver = MockReceiver() + + self._prevent_renew_lock = prevent_renew_lock + self._exception_on_renew_lock = exception_on_renew_lock + + + async def renew_lock(self): + if self._exception_on_renew_lock: + raise Exception("Generated exception via MockReceivedMessage exception_on_renew_lock") + if not self._prevent_renew_lock: + self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration) + + @property + def _lock_expired(self): + if self.locked_until_utc and self.locked_until_utc <= utc_now(): + return True + return False \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 3a0968ab5368..9ea1051a1003 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -33,6 +33,7 @@ from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, CachedServiceBusQueuePreparer, ServiceBusQueuePreparer from utilities import get_logger, print_message, sleep_until_expired +from mocks_async import MockReceivedMessage _logger = get_logger(logging.DEBUG) @@ -148,7 +149,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_receiveandde await sender.send_messages(message) messages = [] - async with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, idle_timeout=5) as receiver: + async with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, idle_timeout=8) as receiver: async for message in receiver: messages.append(message) with pytest.raises(MessageAlreadySettled): @@ -728,7 +729,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_with_autoloc print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) await message.complete() - await renewer.shutdown() + await renewer.close() assert len(messages) == 11 @pytest.mark.liveTest @@ -1107,20 +1108,91 @@ async def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_lin assert len(messages) == 1 await messages[0].complete() + @pytest.mark.asyncio - async def test_async_queue_mock_no_reusing_auto_lock_renew(self): - class MockReceivedMessage: - def __init__(self): - self._received_timestamp_utc = utc_now() - self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=10) + async def test_async_queue_mock_auto_lock_renew_callback(self): + results = [] + errors = [] + async def callback_mock(renewable, error): + results.append(renewable) + if error: + errors.append(error) + + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 # So we can run the test fast. + async with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert len(results) == 1 and results[-1]._lock_expired == True + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: # Check that in normal operation it does not get called + auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._settled = True + await asyncio.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: # Check that it is called when there is an overt renew failure + message = MockReceivedMessage(exception_on_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await asyncio.sleep(3) + assert len(results) == 1 and results[-1]._lock_expired == True + assert errors[-1] + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: # Check that it is not called when the renewer is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + await auto_lock_renew.close() + await asyncio.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: # Check that it is not called when the receiver is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._receiver._running = False + await asyncio.sleep(3) + assert not results + assert not errors - async def renew_lock(self): - self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) + @pytest.mark.asyncio + async def test_async_queue_mock_no_reusing_auto_lock_renew(self): auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + async with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - await asyncio.sleep(12) + await asyncio.sleep(3) with pytest.raises(ServiceBusError): async with auto_lock_renew: @@ -1130,11 +1202,12 @@ async def renew_lock(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) - await auto_lock_renew.shutdown() + await auto_lock_renew.close() with pytest.raises(ServiceBusError): async with auto_lock_renew: diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 450ec473f338..99b05b1d2e2a 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -481,6 +481,10 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self message = Message("{}".format(i), session_id=session_id) await sender.send_messages(message) + results = [] + async def lock_lost_callback(renewable, error): + results.append(renewable) + renewer = AutoLockRenew() messages = [] async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=20) as session: @@ -502,6 +506,7 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self messages.append(message) elif len(messages) == 1: + assert not results await asyncio.sleep(45) print("Second sleep {}".format(session.session.locked_until_utc - utc_now())) assert session.session._lock_expired @@ -513,7 +518,17 @@ async def test_async_session_by_conn_str_receive_handler_with_autolockrenew(self assert isinstance(e.inner_exception, AutoLockRenewTimeout) messages.append(message) - await renewer.shutdown() + # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. + renewer._renew_period = 1 + session = None + + async with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: + session = receiver.session + renewer.register(session, timeout=5, on_lock_renew_failure=lock_lost_callback) + await asyncio.sleep(max(0,(session.locked_until_utc - utc_now()).total_seconds()+1)) # If this pattern repeats make sleep_until_expired_async + assert not results + + await renewer.close() assert len(messages) == 2 @@ -614,7 +629,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect assert len(messages) == 1 else: raise Exception("Failed to receive schdeduled message.") - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest @@ -654,7 +669,7 @@ async def test_async_session_schedule_multiple_messages(self, servicebus_namespa assert len(messages) == 2 else: raise Exception("Failed to receive schdeduled message.") - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest @@ -688,7 +703,7 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac print(str(m)) await m.complete() raise - await renewer.shutdown() + await renewer.close() @pytest.mark.liveTest diff --git a/sdk/servicebus/azure-servicebus/tests/mocks.py b/sdk/servicebus/azure-servicebus/tests/mocks.py new file mode 100644 index 000000000000..4bfea86dbfb6 --- /dev/null +++ b/sdk/servicebus/azure-servicebus/tests/mocks.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +from azure.servicebus._common.utils import utc_now + +class MockReceiver: + def __init__(self): + self._running = True + +class MockReceivedMessage: + def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False): + self._lock_duration = 2 + + self._received_timestamp_utc = utc_now() + self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration) + self._settled = False + self._receiver = MockReceiver() + + self._prevent_renew_lock = prevent_renew_lock + self._exception_on_renew_lock = exception_on_renew_lock + + + def renew_lock(self): + if self._exception_on_renew_lock: + raise Exception("Generated exception via MockReceivedMessage exception_on_renew_lock") + if not self._prevent_renew_lock: + self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration) + + @property + def _lock_expired(self): + if self.locked_until_utc and self.locked_until_utc <= utc_now(): + return True + return False \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 1185bfb5136c..e90e00f0e313 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -37,6 +37,7 @@ from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer from servicebus_preparer import CachedServiceBusNamespacePreparer, ServiceBusQueuePreparer, CachedServiceBusQueuePreparer from utilities import get_logger, print_message, sleep_until_expired +from mocks import MockReceivedMessage _logger = get_logger(logging.DEBUG) @@ -212,7 +213,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_receiveanddelete(self, s messages = [] with sb_client.get_queue_receiver(servicebus_queue.name, mode=ReceiveSettleMode.ReceiveAndDelete, - idle_timeout=5) as receiver: + idle_timeout=8) as receiver: for message in receiver: assert not message.properties assert not message.label @@ -892,7 +893,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_autolockrenew(self, print("Remaining messages", message.locked_until_utc, utc_now()) messages.append(message) message.complete() - renewer.shutdown() + renewer.close() assert len(messages) == 11 @pytest.mark.liveTest @@ -1337,19 +1338,88 @@ def test_queue_message_settle_through_mgmt_link_due_to_broken_receiver_link(self assert len(messages) == 1 messages[0].complete() - def test_queue_mock_no_reusing_auto_lock_renew(self): - class MockReceivedMessage: - def __init__(self): - self._received_timestamp_utc = utc_now() - self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=10) - def renew_lock(self): - self.locked_until_utc = self.locked_until_utc + timedelta(seconds=10) + def test_queue_mock_auto_lock_renew_callback(self): + results = [] + errors = [] + def callback_mock(renewable, error): + results.append(renewable) + if error: + errors.append(error) auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 # So we can run the test fast. + with auto_lock_renew: # Check that it is called when the object expires for any reason (silent renew failure) + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + time.sleep(3) + assert len(results) == 1 and results[-1]._lock_expired == True + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + with auto_lock_renew: # Check that in normal operation it does not get called + auto_lock_renew.register(renewable=MockReceivedMessage(), on_lock_renew_failure=callback_mock) + time.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._settled = True + time.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + with auto_lock_renew: # Check that it is called when there is an overt renew failure + message = MockReceivedMessage(exception_on_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + time.sleep(3) + assert len(results) == 1 and results[-1]._lock_expired == True + assert errors[-1] + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + with auto_lock_renew: # Check that it is not called when the renewer is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + auto_lock_renew.close() + time.sleep(3) + assert not results + assert not errors + + del results[:] + del errors[:] + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 + with auto_lock_renew: # Check that it is not called when the receiver is shutdown + message = MockReceivedMessage(prevent_renew_lock=True) + auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock) + message._receiver._running = False + time.sleep(3) + assert not results + assert not errors + + + def test_queue_mock_no_reusing_auto_lock_renew(self): + auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 # So we can run the test fast. with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) with pytest.raises(ServiceBusError): with auto_lock_renew: @@ -1359,12 +1429,13 @@ def renew_lock(self): auto_lock_renew.register(renewable=MockReceivedMessage()) auto_lock_renew = AutoLockRenew() + auto_lock_renew._renew_period = 1 with auto_lock_renew: auto_lock_renew.register(renewable=MockReceivedMessage()) - time.sleep(12) + time.sleep(3) - auto_lock_renew.shutdown() + auto_lock_renew.close() with pytest.raises(ServiceBusError): with auto_lock_renew: diff --git a/sdk/servicebus/azure-servicebus/tests/test_sessions.py b/sdk/servicebus/azure-servicebus/tests/test_sessions.py index d72721d8c182..56037252ccb9 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_sessions.py +++ b/sdk/servicebus/azure-servicebus/tests/test_sessions.py @@ -575,10 +575,14 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus message = Message("{}".format(i), session_id=session_id) sender.send_messages(message) + results = [] + def lock_lost_callback(renewable, error): + results.append(renewable) + renewer = AutoLockRenew() messages = [] with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: - renewer.register(receiver.session, timeout=60) + renewer.register(receiver.session, timeout=60, on_lock_renew_failure = lock_lost_callback) print("Registered lock renew thread", receiver.session._locked_until_utc, utc_now()) with pytest.raises(SessionLockExpired): for message in receiver: @@ -600,6 +604,7 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus print("Starting second sleep") time.sleep(40) # ensure renewer expires print("Second sleep {}".format(receiver.session._locked_until_utc - utc_now())) + assert not results sleep_until_expired(receiver.session) # and then ensure it didn't slip a renew under the wire. assert receiver.session._lock_expired assert isinstance(receiver.session.auto_renew_error, AutoLockRenewTimeout) @@ -610,7 +615,17 @@ def test_session_by_conn_str_receive_handler_with_autolockrenew(self, servicebus assert isinstance(e.inner_exception, AutoLockRenewTimeout) messages.append(message) - renewer.shutdown() + # While we're testing autolockrenew and sessions, let's make sure we don't call the lock-lost callback when a session exits. + renewer._renew_period = 1 + session = None + + with sb_client.get_queue_session_receiver(servicebus_queue.name, session_id=session_id, idle_timeout=5, mode=ReceiveSettleMode.PeekLock, prefetch=10) as receiver: + session = receiver.session + renewer.register(session, timeout=5, on_lock_renew_failure=lock_lost_callback) + sleep_until_expired(receiver.session) + assert not results + + renewer.close() assert len(messages) == 2