From 96b3872e155e17243f389e98be2c911ae60c8112 Mon Sep 17 00:00:00 2001 From: Kieran Brantner-Magee Date: Tue, 21 Jul 2020 09:40:30 -0700 Subject: [PATCH] pylint/mypy fixes. Adjust line endings and length, use a type alias for the callbacks, differentiate the async and sync autolockrenewer callback docstring. --- .../servicebus/_common/auto_lock_renewer.py | 17 +++++--- .../azure/servicebus/_common/utils.py | 2 - .../aio/_async_auto_lock_renewer.py | 35 +++++++++------ .../azure/servicebus/aio/_async_utils.py | 1 - .../azure-servicebus/samples/README.md | 1 + .../async_samples/auto_lock_renew_async.py | 43 ++++++++++++++++++- .../samples/sync_samples/auto_lock_renew.py | 43 +++++++++++++++++++ 7 files changed, 119 insertions(+), 23 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py index 5885fda7d930..2b7a82e2cef0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py @@ -4,18 +4,23 @@ # license information. # ------------------------------------------------------------------------- -import sys import datetime import logging import threading import time -import functools from concurrent.futures import ThreadPoolExecutor +from typing import TYPE_CHECKING from .._servicebus_session import ServiceBusSession from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError from .utils import renewable_start_time, utc_now +if TYPE_CHECKING: + from typing import Callable, Union, Optional, Awaitable + from .message import ReceivedMessage + LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], None] + _log = logging.getLogger(__name__) class AutoLockRenew(object): @@ -63,6 +68,7 @@ def __exit__(self, *args): self.close() def _renewable(self, renewable): + # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, '_settled') and renewable._settled: @@ -74,6 +80,7 @@ def _renewable(self, renewable): return True def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None): + # pylint: disable=protected-access _log.debug("Running lock auto-renew thread for %r seconds", timeout) error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) @@ -87,8 +94,8 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure= renewable.renew_lock() time.sleep(self.sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e + except AutoLockRenewTimeout as error: + renewable.auto_renew_error = error clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) @@ -108,7 +115,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None): ~azure.servicebus.ServiceBusSession :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]] on_lock_renew_failure: + :param Optional[LockRenewFailureCallback] on_lock_renew_failure: A callback may be specified to be called when the lock is lost on the renewable that is being registered. Default value is None (no callback). """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py index 019bf8fc37bf..82fbfd5e3707 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/utils.py @@ -7,8 +7,6 @@ import sys import datetime import logging -import threading -import time import functools try: from urlparse import urlparse diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py index d173858e9acf..2c48a76fb969 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_auto_lock_renewer.py @@ -7,14 +7,17 @@ import asyncio import logging import datetime -import functools -from typing import Optional, Iterable, Any +from typing import Optional, Iterable, Any, Union, Callable, Awaitable, List +from ._async_message import ReceivedMessage from ._servicebus_session_async import ServiceBusSession from .._common.utils import renewable_start_time, utc_now from ._async_utils import get_running_loop from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError +AsyncLockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage], + Optional[Exception]], Awaitable[None]] + _log = logging.getLogger(__name__) @@ -47,7 +50,7 @@ class AutoLockRenew: def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None: self._shutdown = asyncio.Event() - self._futures = [] + self._futures: List[asyncio.Task] = [] self._loop = loop or get_running_loop() self.sleep_time = 1 self.renew_period = 10 @@ -61,7 +64,8 @@ async def __aenter__(self) -> "AutoLockRenew": async def __aexit__(self, *args: Iterable[Any]) -> None: await self.close() - def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool: + def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bool: + # pylint: disable=protected-access if self._shutdown.is_set(): return False if hasattr(renewable, '_settled') and renewable._settled: @@ -73,10 +77,11 @@ def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> return True async def _auto_lock_renew(self, - renewable: "Union[ReceivedMessage, ServiceBusSession]", + renewable: Union[ReceivedMessage, ServiceBusSession], starttime: datetime.datetime, - timeout: int, - on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]]"=None) -> None: + timeout: float, + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: + # pylint: disable=protected-access _log.debug("Running async lock auto-renew for %r seconds", timeout) error = None clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc) @@ -90,8 +95,8 @@ async def _auto_lock_renew(self, await renewable.renew_lock() await asyncio.sleep(self.sleep_time) clean_shutdown = not renewable._lock_expired - except AutoLockRenewTimeout as e: - renewable.auto_renew_error = e + except AutoLockRenewTimeout as error: + renewable.auto_renew_error = error clean_shutdown = not renewable._lock_expired except Exception as e: # pylint: disable=broad-except _log.debug("Failed to auto-renew lock: %r. Closing thread.", e) @@ -104,24 +109,26 @@ async def _auto_lock_renew(self, await on_lock_renew_failure(renewable, error) def register(self, - renewable: "Union[ReceivedMessage, ServiceBusSession]", + renewable: Union[ReceivedMessage, ServiceBusSession], timeout: float = 300, - on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]" = None) -> None: + on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None: """Register a renewable entity for automatic lock renewal. :param renewable: A locked entity that needs to be renewed. :type renewable: Union[~azure.servicebus.aio.ReceivedMessage,~azure.servicebus.aio.ServiceBusSession] :param float timeout: A time in seconds that the lock should be maintained for. Default value is 300 (5 minutes). - :param Optional[Callable[[Union[~azure.servicebus.aio.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure: - A callback may be specified to be called when the lock is lost on the renewable that is being registered. + :param Optional[AsyncLockRenewFailureCallback] on_lock_renew_failure: + An async callback may be specified to be called when the lock is lost on the renewable being registered. Default value is None (no callback). """ if self._shutdown.is_set(): raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for" " auto lock renewing.") starttime = renewable_start_time(renewable) - renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self._loop) + renew_future = asyncio.ensure_future( + self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), + loop=self._loop) self._futures.append(renew_future) async def close(self) -> None: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py index 6a126fa91b53..387c7e3afcad 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_async_utils.py @@ -11,7 +11,6 @@ from uamqp import authentication -from .._common.utils import renewable_start_time, utc_now from .._common.constants import ( JWT_TOKEN_SCOPE, TOKEN_TYPE_JWT, diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index 12dc083032cb..9f9b47b4b25d 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -51,6 +51,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [auto_lock_renew.py](./sync_samples/auto_lock_renew.py) ([async_version](./async_samples/auto_lock_renew_async.py)) - Examples to show usage of AutoLockRenew: - Automatically renew lock on message received from non-sessionful entity - Automatically renew lock on the session of sessionful entity + - Configure a callback to be triggered on auto lock renew failures. - [mgmt_queue](./sync_samples/mgmt_queue.py) ([async_version](./async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace - Create a queue - Delete a queue diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py index bb05ee358f50..a5914116895e 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/auto_lock_renew_async.py @@ -81,9 +81,50 @@ async def renew_lock_on_session_of_the_sessionful_entity(): await msg.complete() print('Complete messages.') - await renewer.close() + +async def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + async with servicebus_client: + async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + await sender.send_messages(Message("message")) + + async with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = await receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + await asyncio.sleep(80) + + try: + for msg in received_msgs: + await msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') + loop = asyncio.get_event_loop() loop.run_until_complete(renew_lock_on_message_received_from_non_sessionful_entity()) loop.run_until_complete(renew_lock_on_session_of_the_sessionful_entity()) +loop.run_until_complete(renew_lock_with_lock_renewal_failure_callback()) \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py index 41a2f9a86ab0..92268bbcf3f3 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py @@ -17,6 +17,7 @@ import time from azure.servicebus import ServiceBusClient, AutoLockRenew, Message +from azure.servicebus.exceptions import MessageLockExpired CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] @@ -85,5 +86,47 @@ def renew_lock_on_session_of_the_sessionful_entity(): renewer.close() +def renew_lock_with_lock_renewal_failure_callback(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR) + + with servicebus_client: + with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender: + sender.send_messages(Message("message")) + + with AutoLockRenew() as renewer: + # For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the + # service side message lock duration, to demonstrate failure. Normally, this should not be adjusted. + renewer._sleep_time = 40 + with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver: + + def on_lock_renew_failure_callback(renewable, error): + # If auto-lock-renewal fails, this function will be called. + # If failure is due to an error, the second argument will be populated, otherwise + # it will default to `None`. + # This callback can be an ideal location to log the failure, or take action to safely + # handle any processing on the message or session that was in progress. + print("Intentionally failed to renew lock on {} due to {}".format(renewable, error)) + + received_msgs = receiver.receive_messages(max_batch_size=1, max_wait_time=5) + + for msg in received_msgs: + # automatically renew the lock on each message for 120 seconds + renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback) + print('Register messages into AutoLockRenew done.') + + # Cause the messages and autorenewal to time out. + # Other reasons for renew failure could include a network or service outage. + time.sleep(80) + + try: + for msg in received_msgs: + msg.complete() + except MessageLockExpired as e: + print('Messages cannot be settled if they have timed out. (This is expected)') + + print('Lock renew failure demonstration complete.') + + renew_lock_on_message_received_from_non_sessionful_entity() renew_lock_on_session_of_the_sessionful_entity() +renew_lock_with_lock_renewal_failure_callback() \ No newline at end of file