Replace the existing stale container when adding work container #909
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Issue description:
We recently got an issue in our usage of the parallel consumer library. During one period, the size of the consumer group fluctuated repeatedly (12 → 13 → 12 → 13 → 12).
From the logs, the consumer received and completed the record at offset 12650437055. There are no “Received/Completed” entries for 12650437056, yet records at 12650437057, 12650437058, 12650437059, … were all received and completed. During this period, the committed offset for the partition remained at 12650437056 (per metrics), which prevented commits from advancing and created lag on the partition.
After ~14 minutes, we restarted one pod. Immediately afterward, the consumer sought to the committed offset 12650437056, then received and completed that record, and subsequent commits advanced as normal.
This happen rarely in the prod env, it occurred once per months.
After we checked the code, there might be an issue when adding work container.
Let me share the linear timeline for this potential race condition bug. Please let me know if it makes sense.
Time Thread Action ----- ---------- ---------- T1 Broker Poll poll() returns batch [12650437055 (K_A), 12650437056 (K_B), ...] T2 Broker Poll Creates EpochAndRecordsMap(epoch=N), adds to mailbox T3 Control Drains mailbox, starts processing batch (epoch=N) T4 Control Epoch check: N == N ✓ (passes!) T5 Control └── addWorkContainer(12650437055, K_A, epoch=N) Shard K_A CREATED, entry added ------------ REBALANCE HAPPENS HERE (on Broker Poll thread) ------------- T6 Broker Poll onPartitionsRevoked() T7 Broker Poll └── epoch: N → N+1 T8 Broker Poll └── removeStaleContainers() Shard K_A has stale entry (epoch N) → REMOVED ✓ Shard K_B does not exist yet → nothing to remove T9 Broker Poll onPartitionsAssigned() T10 Broker Poll └── epoch: N+1 → N+2 T11 Broker Poll └── removeStaleContainers() Shard K_A is now empty Shard K_B still does not exist ------------- CONTROL THREAD CONTINUES (still processing old batch) ------------ T12 Control └── addWorkContainer(12650437056, K_B, epoch=N) Shard K_B CREATED, stale entry added (This happens AFTER removeStaleContainers!) T13 Broker Poll poll() returns new batch [12650437055, 12650437056, 12650437057, ...] T14 Broker Poll Creates EpochAndRecordsMap(epoch=N+2), adds to mailbox T15 Control Finishes old batch T16 Control Drains mailbox, processes new batch (epoch=N+2) T17 Control Epoch check: N+2 == N+2 ✓ T18 Control └── addWorkContainer(12650437055, K_A, epoch=N+2) Shard K_A is EMPTY (stale removed at T8) → ADDED ✓ T19 Control └── addWorkContainer(12650437056, K_B, epoch=N+2) Shard K_B has stale entry from T12 → DROPPED! ✗ T20 Control └── addWorkContainer(12650437057, K_C, epoch=N+2) Shard K_C never existed → ADDED ✓@rkolesnev @johnbyrnejb @sangreal @eddyv could you please review this PR?
Checklist