Skip to content

Commit

Permalink
pylint/mypy fixes. Adjust line endings and length, use a type alias f…
Browse files Browse the repository at this point in the history
…or the callbacks, differentiate the async and sync autolockrenewer callback docstring.
  • Loading branch information
KieranBrantnerMagee committed Jul 21, 2020
1 parent 570ec22 commit 96b3872
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
# license information.
# -------------------------------------------------------------------------

import sys
import datetime
import logging
import threading
import time
import functools
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING

from .._servicebus_session import ServiceBusSession
from ..exceptions import AutoLockRenewFailed, AutoLockRenewTimeout, ServiceBusError
from .utils import renewable_start_time, utc_now

if TYPE_CHECKING:
from typing import Callable, Union, Optional, Awaitable
from .message import ReceivedMessage
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage],
Optional[Exception]], None]

_log = logging.getLogger(__name__)

class AutoLockRenew(object):
Expand Down Expand Up @@ -63,6 +68,7 @@ def __exit__(self, *args):
self.close()

def _renewable(self, renewable):
# pylint: disable=protected-access
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled:
Expand All @@ -74,6 +80,7 @@ def _renewable(self, renewable):
return True

def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=None):
# pylint: disable=protected-access
_log.debug("Running lock auto-renew thread for %r seconds", timeout)
error = None
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
Expand All @@ -87,8 +94,8 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=
renewable.renew_lock()
time.sleep(self.sleep_time)
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
except AutoLockRenewTimeout as error:
renewable.auto_renew_error = error
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
Expand All @@ -108,7 +115,7 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None):
~azure.servicebus.ServiceBusSession
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
:param Optional[Callable[[Union[~azure.servicebus.ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]] on_lock_renew_failure:
:param Optional[LockRenewFailureCallback] on_lock_renew_failure:
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
Default value is None (no callback).
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import sys
import datetime
import logging
import threading
import time
import functools
try:
from urlparse import urlparse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@
import asyncio
import logging
import datetime
import functools
from typing import Optional, Iterable, Any
from typing import Optional, Iterable, Any, Union, Callable, Awaitable, List

from ._async_message import ReceivedMessage
from ._servicebus_session_async import ServiceBusSession
from .._common.utils import renewable_start_time, utc_now
from ._async_utils import get_running_loop
from ..exceptions import AutoLockRenewTimeout, AutoLockRenewFailed, ServiceBusError

AsyncLockRenewFailureCallback = Callable[[Union[ServiceBusSession, ReceivedMessage],
Optional[Exception]], Awaitable[None]]

_log = logging.getLogger(__name__)


Expand Down Expand Up @@ -47,7 +50,7 @@ class AutoLockRenew:

def __init__(self, loop: Optional[asyncio.BaseEventLoop] = None) -> None:
self._shutdown = asyncio.Event()
self._futures = []
self._futures: List[asyncio.Task] = []
self._loop = loop or get_running_loop()
self.sleep_time = 1
self.renew_period = 10
Expand All @@ -61,7 +64,8 @@ async def __aenter__(self) -> "AutoLockRenew":
async def __aexit__(self, *args: Iterable[Any]) -> None:
await self.close()

def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool:
def _renewable(self, renewable: Union[ReceivedMessage, ServiceBusSession]) -> bool:
# pylint: disable=protected-access
if self._shutdown.is_set():
return False
if hasattr(renewable, '_settled') and renewable._settled:
Expand All @@ -73,10 +77,11 @@ def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") ->
return True

async def _auto_lock_renew(self,
renewable: "Union[ReceivedMessage, ServiceBusSession]",
renewable: Union[ReceivedMessage, ServiceBusSession],
starttime: datetime.datetime,
timeout: int,
on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage], Optional[Exception]], Awaitable[None]]]"=None) -> None:
timeout: float,
on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None:
# pylint: disable=protected-access
_log.debug("Running async lock auto-renew for %r seconds", timeout)
error = None
clean_shutdown = False # Only trigger the on_lock_renew_failure if halting was not expected (shutdown, etc)
Expand All @@ -90,8 +95,8 @@ async def _auto_lock_renew(self,
await renewable.renew_lock()
await asyncio.sleep(self.sleep_time)
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
except AutoLockRenewTimeout as error:
renewable.auto_renew_error = error
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
Expand All @@ -104,24 +109,26 @@ async def _auto_lock_renew(self,
await on_lock_renew_failure(renewable, error)

def register(self,
renewable: "Union[ReceivedMessage, ServiceBusSession]",
renewable: Union[ReceivedMessage, ServiceBusSession],
timeout: float = 300,
on_lock_renew_failure: "Optional[Callable[[Union[ServiceBusSession, ReceivedMessage]], Awaitable[None]]]" = None) -> None:
on_lock_renew_failure: Optional[AsyncLockRenewFailureCallback] = None) -> None:
"""Register a renewable entity for automatic lock renewal.
:param renewable: A locked entity that needs to be renewed.
:type renewable: Union[~azure.servicebus.aio.ReceivedMessage,~azure.servicebus.aio.ServiceBusSession]
:param float timeout: A time in seconds that the lock should be maintained for.
Default value is 300 (5 minutes).
:param Optional[Callable[[Union[~azure.servicebus.aio.ServiceBusSession, ReceivedMessage]], Awaitable[None]]] on_lock_renew_failure:
A callback may be specified to be called when the lock is lost on the renewable that is being registered.
:param Optional[AsyncLockRenewFailureCallback] on_lock_renew_failure:
An async callback may be specified to be called when the lock is lost on the renewable being registered.
Default value is None (no callback).
"""
if self._shutdown.is_set():
raise ServiceBusError("The AutoLockRenew has already been shutdown. Please create a new instance for"
" auto lock renewing.")
starttime = renewable_start_time(renewable)
renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self._loop)
renew_future = asyncio.ensure_future(
self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure),
loop=self._loop)
self._futures.append(renew_future)

async def close(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from uamqp import authentication

from .._common.utils import renewable_start_time, utc_now
from .._common.constants import (
JWT_TOKEN_SCOPE,
TOKEN_TYPE_JWT,
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp
- [auto_lock_renew.py](./sync_samples/auto_lock_renew.py) ([async_version](./async_samples/auto_lock_renew_async.py)) - Examples to show usage of AutoLockRenew:
- Automatically renew lock on message received from non-sessionful entity
- Automatically renew lock on the session of sessionful entity
- Configure a callback to be triggered on auto lock renew failures.
- [mgmt_queue](./sync_samples/mgmt_queue.py) ([async_version](./async_samples/mgmt_queue_async.py)) - Examples to manage queue entities under a given servicebus namespace
- Create a queue
- Delete a queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,50 @@ async def renew_lock_on_session_of_the_sessionful_entity():
await msg.complete()
print('Complete messages.')

await renewer.close()

async def renew_lock_with_lock_renewal_failure_callback():
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

async with servicebus_client:
async with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
await sender.send_messages(Message("message"))

async with AutoLockRenew() as renewer:
# For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the
# service side message lock duration, to demonstrate failure. Normally, this should not be adjusted.
renewer._sleep_time = 40
async with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver:

def on_lock_renew_failure_callback(renewable, error):
# If auto-lock-renewal fails, this function will be called.
# If failure is due to an error, the second argument will be populated, otherwise
# it will default to `None`.
# This callback can be an ideal location to log the failure, or take action to safely
# handle any processing on the message or session that was in progress.
print("Intentionally failed to renew lock on {} due to {}".format(renewable, error))

received_msgs = await receiver.receive_messages(max_batch_size=1, max_wait_time=5)

for msg in received_msgs:
# automatically renew the lock on each message for 120 seconds
renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback)
print('Register messages into AutoLockRenew done.')

# Cause the messages and autorenewal to time out.
# Other reasons for renew failure could include a network or service outage.
await asyncio.sleep(80)

try:
for msg in received_msgs:
await msg.complete()
except MessageLockExpired as e:
print('Messages cannot be settled if they have timed out. (This is expected)')

print('Lock renew failure demonstration complete.')



loop = asyncio.get_event_loop()
loop.run_until_complete(renew_lock_on_message_received_from_non_sessionful_entity())
loop.run_until_complete(renew_lock_on_session_of_the_sessionful_entity())
loop.run_until_complete(renew_lock_with_lock_renewal_failure_callback())
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time

from azure.servicebus import ServiceBusClient, AutoLockRenew, Message
from azure.servicebus.exceptions import MessageLockExpired

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
Expand Down Expand Up @@ -85,5 +86,47 @@ def renew_lock_on_session_of_the_sessionful_entity():
renewer.close()


def renew_lock_with_lock_renewal_failure_callback():
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

with servicebus_client:
with servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) as sender:
sender.send_messages(Message("message"))

with AutoLockRenew() as renewer:
# For this sample we're going to set the renewal recurrence of the autolockrenewer to greater than the
# service side message lock duration, to demonstrate failure. Normally, this should not be adjusted.
renewer._sleep_time = 40
with servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, prefetch=10) as receiver:

def on_lock_renew_failure_callback(renewable, error):
# If auto-lock-renewal fails, this function will be called.
# If failure is due to an error, the second argument will be populated, otherwise
# it will default to `None`.
# This callback can be an ideal location to log the failure, or take action to safely
# handle any processing on the message or session that was in progress.
print("Intentionally failed to renew lock on {} due to {}".format(renewable, error))

received_msgs = receiver.receive_messages(max_batch_size=1, max_wait_time=5)

for msg in received_msgs:
# automatically renew the lock on each message for 120 seconds
renewer.register(msg, timeout=90, on_lock_renew_failure=on_lock_renew_failure_callback)
print('Register messages into AutoLockRenew done.')

# Cause the messages and autorenewal to time out.
# Other reasons for renew failure could include a network or service outage.
time.sleep(80)

try:
for msg in received_msgs:
msg.complete()
except MessageLockExpired as e:
print('Messages cannot be settled if they have timed out. (This is expected)')

print('Lock renew failure demonstration complete.')


renew_lock_on_message_received_from_non_sessionful_entity()
renew_lock_on_session_of_the_sessionful_entity()
renew_lock_with_lock_renewal_failure_callback()

0 comments on commit 96b3872

Please sign in to comment.