Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add better autolockrenew on-failure handling capabilities. #12307

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename autolockrenew shutdown to close to normalize method name with …
…other comparable instances. Adjust tests/docs/guides/etc.

Add changelog entry for the on lock renew callback.
  • Loading branch information
KieranBrantnerMagee committed Jul 13, 2020
commit 21fd0ab2ef62e81e598b41f8ddf938a49afbdf43
10 changes: 8 additions & 2 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# Release History

## 7.0.0b4 (Unreleased)
## 7.0.0b5 (Unreleased)

**New Features**

* Add an on_lock_renew_failure as a parameter to `AutoLockRenew`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion)
* Add `on_lock_renew_failure` as a parameter to `AutoLockRenew.register`, taking a callback for when the lock is lost non-intentially (e.g. not via settling, shutdown, or autolockrenew duration completion)

**Breaking Changes**

* `AutoLockRenew.shutdown` is now `AutoLockRenew.close` to normalize with other equivelent behaviors.

## 7.0.0b4 (Unreleased)

**BugFixes**

Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/azure-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']
session_id = os.environ['SERVICE_BUS_SESSION_ID']

# Can also be called via "with AutoLockRenew() as renewer" to automate shutdown.
# Can also be called via "with AutoLockRenew() as renewer" to automate closing.
renewer = AutoLockRenew()
with ServiceBusClient.from_connection_string(connstr) as client:
with client.get_queue_session_receiver(queue_name, session_id=session_id) as receiver:
Expand All @@ -390,7 +390,7 @@ with ServiceBusClient.from_connection_string(connstr) as client:
renewer.register(msg, timeout=60)
# Do your application logic here
msg.complete()
renewer.shutdown()
renewer.close()
```

If for any reason auto-renewal has been interrupted or failed, this can be observed via the `auto_renew_error` property on the object being renewed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __enter__(self):
return self

def __exit__(self, *args):
self.shutdown()
self.close()

def _renewable(self, renewable):
if self._shutdown.is_set():
Expand Down Expand Up @@ -118,8 +118,8 @@ def register(self, renewable, timeout=300, on_lock_renew_failure=None):
starttime = renewable_start_time(renewable)
self.executor.submit(self._auto_lock_renew, renewable, starttime, timeout, on_lock_renew_failure)

def shutdown(self, wait=True):
"""Shutdown the thread pool to clean up any remaining lock renewal threads.
def close(self, wait=True):
"""Cease autorenewal by shutting down the thread pool to clean up any remaining lock renewal threads.

:param wait: Whether to block until thread pool has shutdown. Default is `True`.
:type wait: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def __aenter__(self) -> "AutoLockRenew":
return self

async def __aexit__(self, *args: Iterable[Any]) -> None:
await self.shutdown()
await self.close()

def _renewable(self, renewable: "Union[ReceivedMessage, ServiceBusSession]") -> bool:
if self._shutdown.is_set():
Expand Down Expand Up @@ -124,7 +124,7 @@ def register(self,
renew_future = asyncio.ensure_future(self._auto_lock_renew(renewable, starttime, timeout, on_lock_renew_failure), loop=self.loop)
self._futures.append(renew_future)

async def shutdown(self) -> None:
"""Cancel remaining open lock renewal futures."""
async def close(self) -> None:
"""Cease autorenewal by cancelling any remaining open lock renewal futures."""
self._shutdown.set()
await asyncio.wait(self._futures)
5 changes: 5 additions & 0 deletions sdk/servicebus/azure-servicebus/migration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ semantics with the sender or receiver lifetime.
| `azure.servicebus.control_client.ServiceBusService().create_queue(queue_name)` | `azure.servicebus.management.ServiceBusManagementClient().create_queue(queue_name)` | [Create a queue](./samples/sync_samples/mgmt_queue.py) |
| `azure.servicebus.ServiceBusClient().list_queues()` | `azure.servicebus.management.ServiceBusManagementClient().list_queues()` | [List queues](./samples/sync_samples/mgmt_queue.py ) |

### Working with AutoLockRenew
| In v0.50 | Equivalent in v7 | Sample |
|---|---|---|
| `azure.servicebus.AutoLockRenew().shutdown()` | `azure.servicebus.AutoLockRenew().close()` | [Close an auto-lock-renewer](./samples/sync_samples/auto_lock_renew.py) |


## Migration samples

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def renew_lock_on_message_received_from_non_sessionful_entity():
await msg.complete()
print('Complete messages.')

await renewer.shutdown()
await renewer.close()
KieranBrantnerMagee marked this conversation as resolved.
Show resolved Hide resolved


async def renew_lock_on_session_of_the_sessionful_entity():
Expand Down Expand Up @@ -81,7 +81,7 @@ async def renew_lock_on_session_of_the_sessionful_entity():
await msg.complete()
print('Complete messages.')

await renewer.shutdown()
await renewer.close()


loop = asyncio.get_event_loop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def renew_lock_on_message_received_from_non_sessionful_entity():
msg.complete() # Settling the message deregisters it from the AutoLockRenewer
print('Complete messages.')

renewer.shutdown()
renewer.close()


def renew_lock_on_session_of_the_sessionful_entity():
Expand Down Expand Up @@ -82,7 +82,7 @@ def renew_lock_on_session_of_the_sessionful_entity():

print('Complete messages.')

renewer.shutdown()
renewer.close()


renew_lock_on_message_received_from_non_sessionful_entity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ async def test_async_queue_by_queue_client_conn_str_receive_handler_with_autoloc
print("Remaining messages", message.locked_until_utc, utc_now())
messages.append(message)
await message.complete()
await renewer.shutdown()
await renewer.close()
assert len(messages) == 11

@pytest.mark.liveTest
Expand Down Expand Up @@ -1148,7 +1148,7 @@ async def callback_mock(renewable):
async with auto_lock_renew: # Check that it is not called when the renewer is shutdown
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
await auto_lock_renew.shutdown()
await auto_lock_renew.close()
await asyncio.sleep(3)
assert len(results) == 2

Expand Down Expand Up @@ -1184,7 +1184,7 @@ async def test_async_queue_mock_no_reusing_auto_lock_renew(self):
auto_lock_renew.register(renewable=MockReceivedMessage())
time.sleep(3)

await auto_lock_renew.shutdown()
await auto_lock_renew.close()

with pytest.raises(ServiceBusError):
async with auto_lock_renew:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ async def lock_lost_callback(renewable):
await asyncio.sleep(max(0,(session.locked_until_utc - utc_now()).total_seconds()+1)) # If this pattern repeats make sleep_until_expired_async
assert not results

await renewer.shutdown()
await renewer.close()
assert len(messages) == 2


Expand Down Expand Up @@ -626,7 +626,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect
assert len(messages) == 1
else:
raise Exception("Failed to receive schdeduled message.")
await renewer.shutdown()
await renewer.close()


@pytest.mark.liveTest
Expand Down Expand Up @@ -666,7 +666,7 @@ async def test_async_session_schedule_multiple_messages(self, servicebus_namespa
assert len(messages) == 2
else:
raise Exception("Failed to receive schdeduled message.")
await renewer.shutdown()
await renewer.close()


@pytest.mark.liveTest
Expand Down Expand Up @@ -700,7 +700,7 @@ async def test_async_session_cancel_scheduled_messages(self, servicebus_namespac
print(str(m))
await m.complete()
raise
await renewer.shutdown()
await renewer.close()


@pytest.mark.liveTest
Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/azure-servicebus/tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_autolockrenew(self,
print("Remaining messages", message.locked_until_utc, utc_now())
messages.append(message)
message.complete()
renewer.shutdown()
renewer.close()
assert len(messages) == 11

@pytest.mark.liveTest
Expand Down Expand Up @@ -1259,7 +1259,7 @@ def callback_mock(renewable):
with auto_lock_renew: # Check that it is not called when the renewer is shutdown
message = MockReceivedMessage(prevent_renew_lock=True)
auto_lock_renew.register(renewable=message, on_lock_renew_failure=callback_mock)
auto_lock_renew.shutdown()
auto_lock_renew.close()
time.sleep(3)
assert len(results) == 2

Expand Down Expand Up @@ -1294,7 +1294,7 @@ def test_queue_mock_no_reusing_auto_lock_renew(self):
auto_lock_renew.register(renewable=MockReceivedMessage())
time.sleep(3)

auto_lock_renew.shutdown()
auto_lock_renew.close()

with pytest.raises(ServiceBusError):
with auto_lock_renew:
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-servicebus/tests/test_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ def lock_lost_callback(renewable):
sleep_until_expired(receiver.session)
assert not results

renewer.shutdown()
renewer.close()
assert len(messages) == 2


Expand Down