Add queue clearing logic to the Redis migrations #679
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
As a followup to #671, this change adds logic like the Redis migration functions which clears the queue of acknowledged, stale tasks up to the lowest pending ID. This is run every thirty seconds until there has been a successful trimming of tasks.
The following is a brief explanation of how Redis streams work:
If you
XREADGROUP
it'll be added to the PEL (pending entries list) and thus show up inXPENDING
until it has beenXACK
ed. But it'll never show up inXREADGROUP
with that specific consumer group again.Once
XACK
ed it removes the reference in the PEL, and will show up in neitherXPENDING
orXREADGROUP
.It will however remain in the stream and show up via
XREAD
regardless of whether it isACK
ed, but we don't do this anywhere. We instead useXAUTOCLAIM
on items that have been read but notXACK
ed for more than 45 seconds and reinsert them into the queue.While it doesn't show up in
XREADGROUP
, it's still there consuming memory until the item has beenXDEL
ed from the stream.The bug was that we would
XACK
tasks, but notXDEL
them at any point up until the changes of #671. However, #671 only fixes tasks that are acknowledged after updating the server instance. This PR is meant to clear stale queue tasks that were acknowledged before updating.The approach here is to use
XPENDING
to get the lowest ID that has been read but not ACKed (knowing the server reads from the stream by order of entry), and toXTRIM
up to that point exclusive. Should there be no items in theXPENDING
return, then the logic will loop with a 30 second delay until there are entries in the PEL at the time of reading. This is an imperfect solution, but it is still likely that the PEL will have at least one entry within a reasonable amount of time given a high-load queue.Given there isn't high load and
XPENDING
never returns a non-empty array, it is assumed that this bug has not consumed enough memory to make clearing the queue automatically a great necessity.