Skip to content

Commit

Permalink
Merge pull request #1018 from vavato-be/add-xautoclaim
Browse files Browse the repository at this point in the history
add XAUTOCLAIM command, added to Redis in 6.2
  • Loading branch information
byroot authored Jul 13, 2021
2 parents ae80708 + 73870ec commit baf5706
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
46 changes: 46 additions & 0 deletions lib/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3280,6 +3280,38 @@ def xclaim(key, group, consumer, min_idle_time, *ids, **opts)
synchronize { |client| client.call(args, &blk) }
end

# Transfers ownership of pending stream entries that match the specified criteria.
#
# @example Claim next pending message stuck > 5 minutes and mark as retry
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0')
# @example Claim 50 next pending messages stuck > 5 minutes and mark as retry
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50)
# @example Claim next pending message stuck > 5 minutes and don't mark as retry
# redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true)
# @example Claim next pending message after this id stuck > 5 minutes and mark as retry
# redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0')
#
# @param key [String] the stream key
# @param group [String] the consumer group name
# @param consumer [String] the consumer name
# @param min_idle_time [Integer] the number of milliseconds
# @param start [String] entry id to start scanning from or 0-0 for everything
# @param count [Integer] number of messages to claim (default 1)
# @param justid [Boolean] whether to fetch just an array of entry ids or not.
# Does not increment retry count when true
#
# @return [Hash{String => Hash}] the entries successfully claimed
# @return [Array<String>] the entry ids successfully claimed if justid option is `true`
def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false)
args = [:xautoclaim, key, group, consumer, min_idle_time, start]
if count
args << 'COUNT' << count.to_s
end
args << 'JUSTID' if justid
blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim
synchronize { |client| client.call(args, &blk) }
end

# Fetches not acknowledging pending entries
#
# @example With key and group
Expand Down Expand Up @@ -3490,6 +3522,20 @@ def method_missing(command, *args) # rubocop:disable Style/MissingRespondToMissi
end
}

HashifyStreamAutoclaim = lambda { |reply|
{
'next' => reply[0],
'entries' => reply[1].map { |entry| [entry[0], entry[1].each_slice(2).to_h] }
}
}

HashifyStreamAutoclaimJustId = lambda { |reply|
{
'next' => reply[0],
'entries' => reply[1]
}
}

HashifyStreamPendings = lambda { |reply|
{
'size' => reply[0],
Expand Down
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REDIS_BRANCH ?= 6.0
REDIS_BRANCH ?= 6.2
TMP := tmp
BUILD_DIR := ${TMP}/cache/redis-${REDIS_BRANCH}
TARBALL := ${TMP}/redis-${REDIS_BRANCH}.tar.gz
Expand Down
67 changes: 67 additions & 0 deletions test/lint/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Lint
module Streams
MIN_REDIS_VERSION = '4.9.0'
MIN_REDIS_VERSION_XAUTOCLAIM = '6.2.0'
ENTRY_ID_FORMAT = /\d+-\d+/.freeze

def setup
Expand Down Expand Up @@ -633,6 +634,72 @@ def test_xclaim_with_invalid_arguments
assert_raises(Redis::CommandError) { redis.xclaim('', '', '', '', '') }
end

def test_xautoclaim
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0')

assert_equal '0-0', actual['next']
assert_equal %w(0-2 0-3), actual['entries'].map(&:first)
assert_equal(%w(v2 v3), actual['entries'].map { |i| i.last['f'] })
end

def test_xautoclaim_with_justid_option
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', justid: true)

assert_equal '0-0', actual['next']
assert_equal %w(0-2 0-3), actual['entries']
end

def test_xautoclaim_with_count_option
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', count: 1)

assert_equal '0-3', actual['next']
assert_equal %w(0-2), actual['entries'].map(&:first)
assert_equal(%w(v2), actual['entries'].map { |i| i.last['f'] })
end

def test_xautoclaim_with_larger_interval
omit_version(MIN_REDIS_VERSION_XAUTOCLAIM)

redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
redis.xadd('s1', { f: 'v2' }, id: '0-2')
redis.xadd('s1', { f: 'v3' }, id: '0-3')
redis.xreadgroup('g1', 'c1', 's1', '>')
sleep 0.01

actual = redis.xautoclaim('s1', 'g1', 'c2', 36_000, '0-0')

assert_equal '0-0', actual['next']
assert_equal [], actual['entries']
end

def test_xpending
redis.xadd('s1', { f: 'v1' }, id: '0-1')
redis.xgroup(:create, 's1', 'g1', '$')
Expand Down

0 comments on commit baf5706

Please sign in to comment.