Skip to content

Commit

Permalink
[SB] fixing link flow issues (#36879)
Browse files Browse the repository at this point in the history
* fixing link flow issues

* add eh

* pylint

* add unittest

* add todo

* changelog
  • Loading branch information
l0lawrence authored Sep 16, 2024
1 parent 0a8dfb5 commit 6e5a7d6
Show file tree
Hide file tree
Showing 12 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ async def _incoming_attach(self, frame):
async def _incoming_transfer(self, frame):
if self.network_trace:
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.delivery_count += 1
if self.received_delivery_id is not None:
self._first_frame = frame
if not self.received_delivery_id and not self._received_payload:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ async def _outgoing_transfer(self, delivery, network_trace_params):
delivery.transfer_state = SessionTransferState.OKAY

async def _incoming_transfer(self, frame):
# TODO: should this be only if more=False?
self.next_incoming_id += 1
self.remote_outgoing_window -= 1
self.incoming_window -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _incoming_transfer(self, frame):
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.delivery_count += 1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
if self.received_delivery_id is not None:
self._first_frame = frame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def _outgoing_transfer(self, delivery, network_trace_params):
delivery.transfer_state = SessionTransferState.OKAY

def _incoming_transfer(self, frame):
# TODO: should this be only if more=False?
self.next_incoming_id += 1
self.remote_outgoing_window -= 1
self.incoming_window -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ def test_receive_transfer_continuation_frame():

link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 2
assert link.delivery_count == 1
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 2
assert link.delivery_count == 1
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == 1
assert link.current_link_credit == 1
assert link.delivery_count == 2
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Fixed a bug where token refreshes were not happening on long running operations ([35717](https://github.com/Azure/azure-sdk-for-python/issues/35717))
- Fixed a bug where using TokenCredential to create a subscription with forwarding caused a `ResourceNotFoundError` ([36545](https://github.com/Azure/azure-sdk-for-python/pull/36545))
- Fixed a bug where messages received on one receiver could not be settled on another receiver over mgmt link ([35304](https://github.com/Azure/azure-sdk-for-python/issues/35304))
- Addressed a bug where excess Link Credits were being allocated when large messages were being received ([34270](https://github.com/Azure/azure-sdk-for-python/issues/34270))

## 7.12.2 (2024-05-08)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ async def _incoming_attach(self, frame):
async def _incoming_transfer(self, frame):
if self.network_trace:
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
# If more is false --> this is the last frame of the message
if not frame[5]:
self.delivery_count += 1
self.current_link_credit -= 1
if self.received_delivery_id is not None:
self._first_frame = frame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ async def _outgoing_transfer(self, delivery, network_trace_params):
delivery.transfer_state = SessionTransferState.OKAY

async def _incoming_transfer(self, frame):
# TODO: should this be only if more=False?
self.next_incoming_id += 1
self.remote_outgoing_window -= 1
self.incoming_window -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _incoming_transfer(self, frame):
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.delivery_count += 1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
if self.received_delivery_id is not None:
self._first_frame = frame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ def _outgoing_transfer(self, delivery, network_trace_params):
delivery.transfer_state = SessionTransferState.OKAY

def _incoming_transfer(self, frame):
# TODO: should this be only if more=False?
self.next_incoming_id += 1
self.remote_outgoing_window -= 1
self.incoming_window -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,6 @@ def _receive(
else 0
)
batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = []
while (
not received_messages_queue.empty() and len(batch) < max_message_count
):
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

# Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
if (
Expand All @@ -447,6 +440,14 @@ def _receive(
link_credit_needed = max_message_count - len(batch)
self._amqp_transport.reset_link_credit(amqp_receive_client, link_credit_needed)

while (
not received_messages_queue.empty() and len(batch) < max_message_count
):
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

first_message_received = expired = False
receiving = True
while receiving and not expired and len(batch) < max_message_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,17 +425,19 @@ async def _receive(
)

batch: Union[List["uamqp_Message"], List["pyamqp_Message"]] = []
while not received_messages_queue.empty() and len(batch) < max_message_count:
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

# Dynamically issue link credit if max_message_count >= 1 when the prefetch_count is the default value 0
if max_message_count and self._prefetch_count == 0 and max_message_count >= 1:
link_credit_needed = max_message_count - len(batch)
await self._amqp_transport.reset_link_credit_async(amqp_receive_client, link_credit_needed)


while not received_messages_queue.empty() and len(batch) < max_message_count:
batch.append(received_messages_queue.get())
received_messages_queue.task_done()
if len(batch) >= max_message_count:
return [self._build_received_message(message) for message in batch]

first_message_received = expired = False
receiving = True
while receiving and not expired and len(batch) < max_message_count:
Expand Down

0 comments on commit 6e5a7d6

Please sign in to comment.