Skip to content

Commit

Permalink
[fix] Key_Shared mode consumption latency when low traffic (apache#23340
Browse files Browse the repository at this point in the history
)

Co-authored-by: Lari Hotari <lhotari@apache.org>
  • Loading branch information
poorbarcode and lhotari authored Sep 23, 2024
1 parent 501dfde commit 4ce0c75
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 8 deletions.
4 changes: 2 additions & 2 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -489,12 +489,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100
dispatcherRetryBackoffInitialTimeInMs=1

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000
dispatcherRetryBackoffMaxTimeInMs=10

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
Expand Down
4 changes: 2 additions & 2 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0
# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the initial backoff delay in milliseconds.
dispatcherRetryBackoffInitialTimeInMs=100
dispatcherRetryBackoffInitialTimeInMs=1

# On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered
# out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff
# delay. This parameter sets the maximum backoff delay in milliseconds.
dispatcherRetryBackoffMaxTimeInMs=1000
dispatcherRetryBackoffMaxTimeInMs=10

# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,14 +1231,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the initial backoff delay in milliseconds.")
private int dispatcherRetryBackoffInitialTimeInMs = 100;
private int dispatcherRetryBackoffInitialTimeInMs = 1;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered "
+ "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff "
+ "delay. This parameter sets the maximum backoff delay in milliseconds.")
private int dispatcherRetryBackoffMaxTimeInMs = 1000;
private int dispatcherRetryBackoffMaxTimeInMs = 10;

@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -729,11 +729,13 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType,
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
updatePendingBytesToDispatch(-totalBytesSize);
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
retryBackoff.reset();
}
if (triggerReadingMore) {
if (entriesDispatched > 0 || skipNextBackoff) {
skipNextBackoff = false;
// Reset the backoff when we successfully dispatched messages
retryBackoff.reset();
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
Expand Down

0 comments on commit 4ce0c75

Please sign in to comment.