Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better autolockrenew on-failure handling capabilities. #12307

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge from upstream master and adjust code accordingly to incorporate…
… with OOB changes. (param renames e.g. _lock_expired)
  • Loading branch information
KieranBrantnerMagee committed Jul 20, 2020
commit c659829c439a8b3845f2ddfdba6cfdbd441451a9
21 changes: 21 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,27 @@

**New Features**

* Added new properties to Message, PeekMessage and ReceivedMessage: `content_type`, `correlation_id`, `label`,
`message_id`, `reply_to`, `reply_to_session_id` and `to`. Please refer to the docstring for further information.

* Add new properties to PeekedMessaged and ReceivedMessage: `enqueued_sequence_number`, `dead_letter_error_description`,
`dead_letter_reason`, `dead_letter_source`, `delivery_count` and `expires_at_utc`. Please refer to the docstring for further information.

**Breaking Changes**

* Removed/Renamed several properties and instance variables on Message (the changes applied to the inherited Message type PeekMessage and ReceivedMessage).
- Renamed property `user_properties` to `properties`
- The original instance variable `properties` which represents the AMQP properties now becomes an internal instance variable `_amqp_properties`.
- Removed property `enqueue_sequence_number`.
- Removed property `annotations`.
- Removed instance variable `header`.

* Removed several properties and instance variables on PeekMessage and ReceivedMessage.
- Removed proeprty `partition_id` on both type.
- Removed instance variable `received_timestamp_utc` on both type.
- Removed property `settled` on `PeekMessage`.
- Removed property `expired` on `ReceivedMessage`.

* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion)

**Breaking Changes**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ def __exit__(self, *args):
def _renewable(self, renewable):
if self._shutdown.is_set():
return False
if hasattr(renewable, 'settled') and renewable.settled:
if hasattr(renewable, '_settled') and renewable._settled:
return False
if not renewable._receiver._running:
return False
if renewable.expired:
if renewable._lock_expired:
return False
return True

Expand All @@ -86,10 +86,10 @@ def _auto_lock_renew(self, renewable, starttime, timeout, on_lock_renew_failure=
_log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period)
renewable.renew_lock()
time.sleep(self.sleep_time)
clean_shutdown = not renewable.expired
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
clean_shutdown = not renewable.expired
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ async def __aexit__(self, *args: Iterable[Any]) -> None:
def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool:
if self._shutdown.is_set():
return False
if hasattr(renewable, 'settled') and renewable.settled:
if hasattr(renewable, '_settled') and renewable._settled:
return False
if renewable.expired:
if renewable._lock_expired:
return False
if not renewable._receiver._running:
return False
Expand All @@ -89,10 +89,10 @@ async def _auto_lock_renew(self,
_log.debug("%r seconds or less until lock expires - auto renewing.", self.renew_period)
await renewable.renew_lock()
await asyncio.sleep(self.sleep_time)
clean_shutdown = not renewable.expired
clean_shutdown = not renewable._lock_expired
except AutoLockRenewTimeout as e:
renewable.auto_renew_error = e
clean_shutdown = not renewable.expired
clean_shutdown = not renewable._lock_expired
except Exception as e: # pylint: disable=broad-except
_log.debug("Failed to auto-renew lock: %r. Closing thread.", e)
error = AutoLockRenewFailed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class MockReceivedMessage:
def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False):
self._lock_duration = 2

self.received_timestamp_utc = utc_now()
self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration)
self.settled = False
self._received_timestamp_utc = utc_now()
self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
self._settled = False
self._receiver = MockReceiver()

self._prevent_renew_lock = prevent_renew_lock
Expand All @@ -26,7 +26,7 @@ async def renew_lock(self):
self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration)

@property
def expired(self):
def _lock_expired(self):
if self.locked_until_utc and self.locked_until_utc <= utc_now():
return True
return False
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ async def callback_mock(renewable, error):
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
await asyncio.sleep(3)
assert len(results) and results[-1].expired == True
assert len(results) and results[-1]._lock_expired == True
assert not errors

auto_lock_renew = AutoLockRenew()
Expand All @@ -1140,7 +1140,7 @@ async def callback_mock(renewable, error):
async with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
message.settled = True
message._settled = True
await asyncio.sleep(3)
assert len(results) == 1
assert not errors
Expand All @@ -1151,7 +1151,7 @@ async def callback_mock(renewable, error):
message = MockReceivedMessage(exception_on_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
await asyncio.sleep(3)
assert len(results) == 2 and results[-1].expired == True
assert len(results) == 2 and results[-1]._lock_expired == True
assert errors[-1]

auto_lock_renew = AutoLockRenew()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,7 @@ async def lock_lost_callback(renewable, error):
assert not results
await asyncio.sleep(45)
print("Second sleep {}".format(session.session.locked_until_utc - utc_now()))
assert not results
assert session.session.expired
assert session.session._lock_expired
assert isinstance(session.session.auto_renew_error, AutoLockRenewTimeout)
try:
await message.complete()
Expand Down
8 changes: 4 additions & 4 deletions sdk/servicebus/azure-servicebus/tests/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class MockReceivedMessage:
def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False):
self._lock_duration = 2

self.received_timestamp_utc = utc_now()
self.locked_until_utc = self.received_timestamp_utc + timedelta(seconds=self._lock_duration)
self.settled = False
self._received_timestamp_utc = utc_now()
self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
self._settled = False
self._receiver = MockReceiver()

self._prevent_renew_lock = prevent_renew_lock
Expand All @@ -26,7 +26,7 @@ def renew_lock(self):
self.locked_until_utc = self.locked_until_utc + timedelta(seconds=self._lock_duration)

@property
def expired(self):
def _lock_expired(self):
if self.locked_until_utc and self.locked_until_utc <= utc_now():
return True
return False
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ def callback_mock(renewable, error):
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
time.sleep(3)
assert len(results) and results[-1].expired == True
assert len(results) and results[-1]._lock_expired == True
assert not errors

auto_lock_renew = AutoLockRenew()
Expand All @@ -1369,7 +1369,7 @@ def callback_mock(renewable, error):
with auto_lock_renew: # Check that when a message is settled, it will not get called even after expiry
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
message.settled = True
message._settled = True
time.sleep(3)
assert len(results) == 1
assert not errors
Expand All @@ -1380,7 +1380,7 @@ def callback_mock(renewable, error):
message = MockReceivedMessage(exception_on_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
time.sleep(3)
assert len(results) == 2 and results[-1].expired == True
assert len(results) == 2 and results[-1]._lock_expired == True
assert errors[-1]

auto_lock_renew = AutoLockRenew()
Expand Down
3 changes: 1 addition & 2 deletions sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,7 @@ def lock_lost_callback(renewable, error):
print("Second sleep {}".format(receiver.session._locked_until_utc - utc_now()))
assert not results
sleep_until_expired(receiver.session) # and then ensure it didn't slip a renew under the wire.
assert receiver.session.expired
assert not results # Should not callback since it timed out as specified.
assert receiver.session._lock_expired
assert isinstance(receiver.session.auto_renew_error, AutoLockRenewTimeout)
try:
message.complete()
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.