diff --git a/lib/redis.rb b/lib/redis.rb index 676ba6940..aaa990a08 100644 --- a/lib/redis.rb +++ b/lib/redis.rb @@ -3280,6 +3280,38 @@ def xclaim(key, group, consumer, min_idle_time, *ids, **opts) synchronize { |client| client.call(args, &blk) } end + # Transfers ownership of pending stream entries that match the specified criteria. + # + # @example Claim next pending message stuck > 5 minutes and mark as retry + # redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0') + # @example Claim 50 next pending messages stuck > 5 minutes and mark as retry + # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', count: 50) + # @example Claim next pending message stuck > 5 minutes and don't mark as retry + # redis.xclaim('mystream', 'mygroup', 'consumer1', 3600000, '0-0', justid: true) + # @example Claim next pending message after this id stuck > 5 minutes and mark as retry + # redis.xautoclaim('mystream', 'mygroup', 'consumer1', 3600000, '1641321233-0') + # + # @param key [String] the stream key + # @param group [String] the consumer group name + # @param consumer [String] the consumer name + # @param min_idle_time [Integer] the number of milliseconds + # @param start [String] entry id to start scanning from or 0-0 for everything + # @param count [Integer] number of messages to claim (default 1) + # @param justid [Boolean] whether to fetch just an array of entry ids or not. + # Does not increment retry count when true + # + # @return [Hash{String => Hash}] the entries successfully claimed + # @return [Array] the entry ids successfully claimed if justid option is `true` + def xautoclaim(key, group, consumer, min_idle_time, start, count: nil, justid: false) + args = [:xautoclaim, key, group, consumer, min_idle_time, start] + if count + args << 'COUNT' << count.to_s + end + args << 'JUSTID' if justid + blk = justid ? HashifyStreamAutoclaimJustId : HashifyStreamAutoclaim + synchronize { |client| client.call(args, &blk) } + end + # Fetches not acknowledging pending entries # # @example With key and group @@ -3490,6 +3522,20 @@ def method_missing(command, *args) # rubocop:disable Style/MissingRespondToMissi end } + HashifyStreamAutoclaim = lambda { |reply| + { + 'next' => reply[0], + 'entries' => reply[1].map { |entry| [entry[0], entry[1].each_slice(2).to_h] } + } + } + + HashifyStreamAutoclaimJustId = lambda { |reply| + { + 'next' => reply[0], + 'entries' => reply[1] + } + } + HashifyStreamPendings = lambda { |reply| { 'size' => reply[0], diff --git a/makefile b/makefile index 307e75c30..c803066b1 100644 --- a/makefile +++ b/makefile @@ -1,4 +1,4 @@ -REDIS_BRANCH ?= 6.0 +REDIS_BRANCH ?= 6.2 TMP := tmp BUILD_DIR := ${TMP}/cache/redis-${REDIS_BRANCH} TARBALL := ${TMP}/redis-${REDIS_BRANCH}.tar.gz diff --git a/test/lint/streams.rb b/test/lint/streams.rb index 149eab13f..a6651ced6 100644 --- a/test/lint/streams.rb +++ b/test/lint/streams.rb @@ -3,6 +3,7 @@ module Lint module Streams MIN_REDIS_VERSION = '4.9.0' + MIN_REDIS_VERSION_XAUTOCLAIM = '6.2.0' ENTRY_ID_FORMAT = /\d+-\d+/.freeze def setup @@ -633,6 +634,72 @@ def test_xclaim_with_invalid_arguments assert_raises(Redis::CommandError) { redis.xclaim('', '', '', '', '') } end + def test_xautoclaim + omit_version(MIN_REDIS_VERSION_XAUTOCLAIM) + + redis.xadd('s1', { f: 'v1' }, id: '0-1') + redis.xgroup(:create, 's1', 'g1', '$') + redis.xadd('s1', { f: 'v2' }, id: '0-2') + redis.xadd('s1', { f: 'v3' }, id: '0-3') + redis.xreadgroup('g1', 'c1', 's1', '>') + sleep 0.01 + + actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0') + + assert_equal '0-0', actual['next'] + assert_equal %w(0-2 0-3), actual['entries'].map(&:first) + assert_equal(%w(v2 v3), actual['entries'].map { |i| i.last['f'] }) + end + + def test_xautoclaim_with_justid_option + omit_version(MIN_REDIS_VERSION_XAUTOCLAIM) + + redis.xadd('s1', { f: 'v1' }, id: '0-1') + redis.xgroup(:create, 's1', 'g1', '$') + redis.xadd('s1', { f: 'v2' }, id: '0-2') + redis.xadd('s1', { f: 'v3' }, id: '0-3') + redis.xreadgroup('g1', 'c1', 's1', '>') + sleep 0.01 + + actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', justid: true) + + assert_equal '0-0', actual['next'] + assert_equal %w(0-2 0-3), actual['entries'] + end + + def test_xautoclaim_with_count_option + omit_version(MIN_REDIS_VERSION_XAUTOCLAIM) + + redis.xadd('s1', { f: 'v1' }, id: '0-1') + redis.xgroup(:create, 's1', 'g1', '$') + redis.xadd('s1', { f: 'v2' }, id: '0-2') + redis.xadd('s1', { f: 'v3' }, id: '0-3') + redis.xreadgroup('g1', 'c1', 's1', '>') + sleep 0.01 + + actual = redis.xautoclaim('s1', 'g1', 'c2', 10, '0-0', count: 1) + + assert_equal '0-3', actual['next'] + assert_equal %w(0-2), actual['entries'].map(&:first) + assert_equal(%w(v2), actual['entries'].map { |i| i.last['f'] }) + end + + def test_xautoclaim_with_larger_interval + omit_version(MIN_REDIS_VERSION_XAUTOCLAIM) + + redis.xadd('s1', { f: 'v1' }, id: '0-1') + redis.xgroup(:create, 's1', 'g1', '$') + redis.xadd('s1', { f: 'v2' }, id: '0-2') + redis.xadd('s1', { f: 'v3' }, id: '0-3') + redis.xreadgroup('g1', 'c1', 's1', '>') + sleep 0.01 + + actual = redis.xautoclaim('s1', 'g1', 'c2', 36_000, '0-0') + + assert_equal '0-0', actual['next'] + assert_equal [], actual['entries'] + end + def test_xpending redis.xadd('s1', { f: 'v1' }, id: '0-1') redis.xgroup(:create, 's1', 'g1', '$')