Skip to content

Commit e6e960e

Browse files
swathipilRena Chen
authored andcommitted
[EventHub/ServiceBus] lock pending mgmt op (#39633)
* [SB] Msg IDs on mgmt/cbs ops should be unique * copy over sb pyamqp to eh * [ServiceBus] fix autolockrenew sleep/lock pending mgmt ops * remove alr sleep * add tests * add sync alr 300 msg renew test * remove disable local auth for tests ci run * add alr renew 300 msgs test for sync * revert alr sample * fix stress bicep policy violations * update changelog
1 parent 61b925f commit e6e960e

File tree

9 files changed

+66
-5
lines changed

9 files changed

+66
-5
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 5.14.0 (2025-02-11)
3+
## 5.14.0 (2025-02-13)
44

55
### Features Added
66

@@ -12,6 +12,7 @@
1212
- Fixed a bug where pyAMQP was doubly retrying, causing latency on reconnect. ([#39037](https://github.com/Azure/azure-sdk-for-python/pull/39037))
1313
- Fix to handle large messages being sent twice due to incoming flow frames triggering a resend. ([#38067](https://github.com/Azure/azure-sdk-for-python/pull/38067))
1414
- Missing await in sender async on pyAMQP. ([#39182](https://github.com/Azure/azure-sdk-for-python/pull/39182))
15+
- Fixed a bug where message IDs in management operation requests were not unique.
1516

1617
### Other Changes
1718

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/management_link.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from functools import partial
1111
from collections import namedtuple
1212
from typing import Optional, Union
13+
from threading import Lock
1314

1415
from .sender import SenderLink
1516
from .receiver import ReceiverLink
@@ -40,6 +41,7 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
4041
def __init__(self, session, endpoint, **kwargs):
4142
self.state = ManagementLinkState.IDLE
4243
self._pending_operations = []
44+
self.lock = Lock()
4345
self._session = session
4446
self._network_trace_params = kwargs.get("network_trace_params")
4547
self._request_link: SenderLink = session.create_sender_link(
@@ -157,6 +159,7 @@ def _on_message_received(self, _, message):
157159
to_remove_operation.on_execute_operation_complete(
158160
mgmt_result, status_code, status_description, message, response_detail.get(b"error-condition")
159161
)
162+
with self.lock:
160163
self._pending_operations.remove(to_remove_operation)
161164

162165
def _on_send_complete(self, message_delivery, reason, state): # todo: reason is never used, should check spec

sdk/eventhub/azure-eventhub/stress/stress-test-resources.bicep

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ resource eventHubsNamespace 'Microsoft.EventHub/Namespaces@2017-04-01' = {
2727
name: 'Standard'
2828
tier: 'Standard'
2929
}
30-
properties: {}
30+
properties: {
31+
disableLocalAuth: true
32+
}
3133
}
3234

3335
resource eventHubsNamespace_eventHubName 'Microsoft.EventHub/namespaces/eventhubs@2017-04-01' = {
@@ -36,6 +38,7 @@ resource eventHubsNamespace_eventHubName 'Microsoft.EventHub/namespaces/eventhub
3638
properties: {
3739
messageRetentionInDays: 5
3840
partitionCount: 32
41+
disableLocalAuth: true
3942
}
4043
dependsOn: [
4144
eventHubsNamespace
@@ -66,6 +69,7 @@ resource storageAccount 'Microsoft.Storage/storageAccounts@2019-06-01' = {
6669
kind: 'StorageV2'
6770
properties: {
6871
accessTier: 'Hot'
72+
allowSharedKeyAccess: false
6973
}
7074
}
7175

sdk/servicebus/azure-servicebus/CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Release History
22

3-
## 7.14.0 (2025-02-11)
3+
## 7.14.0 (2025-02-13)
44

55
### Features Added
66

@@ -15,6 +15,8 @@
1515
- Fixed a bug where pyAMQP was doubly retrying, causing latency on reconnect. ([#39037](https://github.com/Azure/azure-sdk-for-python/pull/39037))
1616
- Fix to handle large messages being sent twice due to incoming flow frames triggering a resend. ([#38067](https://github.com/Azure/azure-sdk-for-python/pull/38067))
1717
- Missing await in sender async on pyAMQP. ([#39182](https://github.com/Azure/azure-sdk-for-python/pull/39182))
18+
- Improved AutoLockRenewer to renew locks for more registered messages. ([#37340](https://github.com/Azure/azure-sdk-for-python/issues/37340))
19+
- Fixed a bug where message IDs in management operation requests were not unique.
1820

1921
### Other Changes
2022

sdk/servicebus/azure-servicebus/azure/servicebus/_common/auto_lock_renewer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ def _auto_lock_renew_task(
200200
except AttributeError:
201201
# Renewable is a message
202202
receiver.renew_message_lock(renewable) # type: ignore
203-
time.sleep(self._sleep_time)
204203
# enqueue a new task, keeping renewing the renewable
205204
if self._renewable(renewable):
206205
self._renew_tasks.put(

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/management_link.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from functools import partial
1111
from collections import namedtuple
1212
from typing import Optional, Union
13+
from threading import Lock
1314

1415
from .sender import SenderLink
1516
from .receiver import ReceiverLink
@@ -40,6 +41,7 @@ class ManagementLink(object): # pylint:disable=too-many-instance-attributes
4041
def __init__(self, session, endpoint, **kwargs):
4142
self.state = ManagementLinkState.IDLE
4243
self._pending_operations = []
44+
self.lock = Lock()
4345
self._session = session
4446
self._network_trace_params = kwargs.get("network_trace_params")
4547
self._request_link: SenderLink = session.create_sender_link(
@@ -157,6 +159,7 @@ def _on_message_received(self, _, message):
157159
to_remove_operation.on_execute_operation_complete(
158160
mgmt_result, status_code, status_description, message, response_detail.get(b"error-condition")
159161
)
162+
with self.lock:
160163
self._pending_operations.remove(to_remove_operation)
161164

162165
def _on_send_complete(self, message_delivery, reason, state): # todo: reason is never used, should check spec

sdk/servicebus/azure-servicebus/samples/sync_samples/auto_lock_renew.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,4 @@ def on_lock_renew_failure_callback(renewable, error):
138138

139139
renew_lock_on_message_received_from_non_sessionful_entity()
140140
renew_lock_on_session_of_the_sessionful_entity()
141-
renew_lock_with_lock_renewal_failure_callback()
141+
renew_lock_with_lock_renewal_failure_callback()

sdk/servicebus/azure-servicebus/stress/stress-test-resources.bicep

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ resource servicebus 'Microsoft.ServiceBus/namespaces@2018-01-01-preview' = {
1717
}
1818
properties: {
1919
zoneRedundant: false
20+
disableLocalAuth: true
2021
}
2122
}
2223

@@ -27,6 +28,9 @@ resource servicebusPremium 'Microsoft.ServiceBus/namespaces@2018-01-01-preview'
2728
name: 'Premium'
2829
tier: 'Premium'
2930
}
31+
properties: {
32+
disableLocalAuth: true
33+
}
3034
}
3135

3236

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1678,6 +1678,51 @@ def test_queue_by_queue_client_conn_str_receive_handler_with_auto_autolockrenew(
16781678
renewer.close()
16791679
assert len(messages) == 11
16801680

1681+
@pytest.mark.liveTest
1682+
@pytest.mark.live_test_only
1683+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
1684+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
1685+
@ServiceBusQueuePreparer(
1686+
name_prefix="servicebustest", dead_lettering_on_message_expiration=True, lock_duration="PT55S"
1687+
)
1688+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
1689+
@ArgPasser()
1690+
def test_queue_receive_handler_with_autolockrenew_many_msgs(
1691+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
1692+
):
1693+
# number of messages to receive and renew locks for
1694+
max_msg_count = 300
1695+
1696+
# only tested on sync since async can concurrently send renew-lock requests without waiting for the transfer frame from the service
1697+
if sys.platform.startswith("darwin"):
1698+
pytest.skip("Skipping since ALR renewal is slower, so message locks are expiring. Potentially due to lower # of available threads/workers.")
1699+
1700+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
1701+
credential = get_credential()
1702+
with ServiceBusClient(
1703+
fully_qualified_namespace, credential, logging_enable=True, uamqp_transport=uamqp_transport
1704+
) as sb_client:
1705+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1706+
msgs_to_send = [ServiceBusMessage("message: {}".format(i)) for i in range(400)]
1707+
sender.send_messages(msgs_to_send)
1708+
1709+
# Can also be called via "with AutoLockRenewer() as renewer" to automate shutdown.
1710+
renewer = AutoLockRenewer(max_lock_renewal_duration=60*5)
1711+
with sb_client.get_queue_receiver(queue_name=servicebus_queue.name, auto_lock_renewer=renewer) as receiver:
1712+
received_msgs = receiver.receive_messages(max_message_count=max_msg_count)
1713+
# At least 300 messages should be renewed
1714+
# TODO: This should be bumped up once sync alr perf is improved
1715+
while len(received_msgs) < max_msg_count:
1716+
count = max_msg_count - len(received_msgs)
1717+
received_msgs.extend(receiver.receive_messages(max_message_count=count))
1718+
1719+
assert len(received_msgs) == max_msg_count, "Did not receive all messages"
1720+
time.sleep(100)
1721+
for msg in received_msgs:
1722+
receiver.complete_message(msg) # Settling the message deregisters it from the AutoLockRenewer
1723+
1724+
renewer.close()
1725+
16811726
@pytest.mark.liveTest
16821727
@pytest.mark.live_test_only
16831728
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")

0 commit comments

Comments
 (0)