diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py index 62439e49b6c0..5249323853b8 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py @@ -59,11 +59,11 @@ async def _incoming_attach(self, frame): async def _incoming_transfer(self, frame): if self.network_trace: _LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params) - self.delivery_count += 1 self.received_delivery_id = frame[1] # delivery_id # If more is false --> this is the last frame of the message if not frame[5]: self.current_link_credit -= 1 + self.delivery_count += 1 if self.received_delivery_id is not None: self._first_frame = frame if not self.received_delivery_id and not self._received_payload: diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_session_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_session_async.py index 537ae79b8e6d..7c246969729d 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_session_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_session_async.py @@ -334,6 +334,7 @@ async def _outgoing_transfer(self, delivery, network_trace_params): delivery.transfer_state = SessionTransferState.OKAY async def _incoming_transfer(self, frame): + # TODO: should this be only if more=False? self.next_incoming_id += 1 self.remote_outgoing_window -= 1 self.incoming_window -= 1 diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py index 4a8ab9214b98..5b763a206186 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py @@ -60,7 +60,7 @@ def _incoming_transfer(self, frame): # If more is false --> this is the last frame of the message if not frame[5]: self.current_link_credit -= 1 - self.delivery_count += 1 + self.delivery_count += 1 self.received_delivery_id = frame[1] # delivery_id if self.received_delivery_id is not None: self._first_frame = frame diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/session.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/session.py index 1e3df67752c6..afd898f0a316 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/session.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/session.py @@ -370,6 +370,7 @@ def _outgoing_transfer(self, delivery, network_trace_params): delivery.transfer_state = SessionTransferState.OKAY def _incoming_transfer(self, frame): + # TODO: should this be only if more=False? self.next_incoming_id += 1 self.remote_outgoing_window -= 1 self.incoming_window -= 1 diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py index 0f99c17a085e..c487153e1c1b 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py @@ -105,7 +105,10 @@ def test_receive_transfer_continuation_frame(): link._incoming_transfer(transfer_frame_one) assert link.current_link_credit == 2 + assert link.delivery_count == 1 link._incoming_transfer(transfer_frame_two) assert link.current_link_credit == 2 + assert link.delivery_count == 1 link._incoming_transfer(transfer_frame_three) - assert link.current_link_credit == 1 \ No newline at end of file + assert link.current_link_credit == 1 + assert link.delivery_count == 2 \ No newline at end of file diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index c8fa3e75948d..e24d99c97667 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -7,6 +7,7 @@ - Fixed a bug where token refreshes were not happening on long running operations ([35717](https://github.com/Azure/azure-sdk-for-python/issues/35717)) - Fixed a bug where using TokenCredential to create a subscription with forwarding caused a `ResourceNotFoundError` ([36545](https://github.com/Azure/azure-sdk-for-python/pull/36545)) - Fixed a bug where messages received on one receiver could not be settled on another receiver over mgmt link ([35304](https://github.com/Azure/azure-sdk-for-python/issues/35304)) + - Addressed a bug where excess Link Credits were being allocated when large messages were being received ([34270](https://github.com/Azure/azure-sdk-for-python/issues/34270)) ## 7.12.2 (2024-05-08) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py index 62439e49b6c0..211511581f6d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py @@ -59,10 +59,10 @@ async def _incoming_attach(self, frame): async def _incoming_transfer(self, frame): if self.network_trace: _LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params) - self.delivery_count += 1 self.received_delivery_id = frame[1] # delivery_id # If more is false --> this is the last frame of the message if not frame[5]: + self.delivery_count += 1 self.current_link_credit -= 1 if self.received_delivery_id is not None: self._first_frame = frame diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_session_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_session_async.py index 537ae79b8e6d..7c246969729d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_session_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_session_async.py @@ -334,6 +334,7 @@ async def _outgoing_transfer(self, delivery, network_trace_params): delivery.transfer_state = SessionTransferState.OKAY async def _incoming_transfer(self, frame): + # TODO: should this be only if more=False? self.next_incoming_id += 1 self.remote_outgoing_window -= 1 self.incoming_window -= 1 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py index 4a8ab9214b98..5b763a206186 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py @@ -60,7 +60,7 @@ def _incoming_transfer(self, frame): # If more is false --> this is the last frame of the message if not frame[5]: self.current_link_credit -= 1 - self.delivery_count += 1 + self.delivery_count += 1 self.received_delivery_id = frame[1] # delivery_id if self.received_delivery_id is not None: self._first_frame = frame diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/session.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/session.py index 1e3df67752c6..afd898f0a316 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/session.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/session.py @@ -370,6 +370,7 @@ def _outgoing_transfer(self, delivery, network_trace_params): delivery.transfer_state = SessionTransferState.OKAY def _incoming_transfer(self, frame): + # TODO: should this be only if more=False? self.next_incoming_id += 1 self.remote_outgoing_window -= 1 self.incoming_window -= 1 diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py index 9610a11600f0..fc8c472bbe95 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_receiver.py @@ -430,13 +430,6 @@ def _receive( else 0 ) batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = [] - while ( - not received_messages_queue.empty() and len(batch) < max_message_count - ): - batch.append(received_messages_queue.get()) - received_messages_queue.task_done() - if len(batch) >= max_message_count: - return [self._build_received_message(message) for message in batch] # Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0 if ( @@ -447,6 +440,14 @@ def _receive( link_credit_needed = max_message_count - len(batch) self._amqp_transport.reset_link_credit(amqp_receive_client, link_credit_needed) + while ( + not received_messages_queue.empty() and len(batch) < max_message_count + ): + batch.append(received_messages_queue.get()) + received_messages_queue.task_done() + if len(batch) >= max_message_count: + return [self._build_received_message(message) for message in batch] + first_message_received = expired = False receiving = True while receiving and not expired and len(batch) < max_message_count: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 76f7b7d79084..3654a4740d1a 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -425,17 +425,19 @@ async def _receive( ) batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = [] - while not received_messages_queue.empty() and len(batch) < max_message_count: - batch.append(received_messages_queue.get()) - received_messages_queue.task_done() - if len(batch) >= max_message_count: - return [self._build_received_message(message) for message in batch] # Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0 if max_message_count and self._prefetch_count == 0 and max_message_count >= 1: link_credit_needed = max_message_count - len(batch) await self._amqp_transport.reset_link_credit_async(amqp_receive_client, link_credit_needed) + + while not received_messages_queue.empty() and len(batch) < max_message_count: + batch.append(received_messages_queue.get()) + received_messages_queue.task_done() + if len(batch) >= max_message_count: + return [self._build_received_message(message) for message in batch] + first_message_received = expired = False receiving = True while receiving and not expired and len(batch) < max_message_count: