Skip to content

Commit

Permalink
add queue.Drain and redisClient.RPop method
Browse files Browse the repository at this point in the history
  • Loading branch information
psampaz committed Aug 31, 2022
1 parent ba362f0 commit 2e5c35a
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 1 deletion.
21 changes: 21 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Queue interface {
ReturnUnacked(max int64) (int64, error)
ReturnRejected(max int64) (int64, error)
Destroy() (readyCount, rejectedCount int64, err error)
Drain(count int64) ([]string, error)

// internals
// used in cleaner
Expand Down Expand Up @@ -459,6 +460,26 @@ func (queue *redisQueue) move(from, to string, max int64) (n int64, error error)
return n, nil
}

// Drain removes and returns 'count' elements from the queue. In case of an error,
// Drain return all elements removed until the error occurred and the error itself.
func (queue *redisQueue) Drain(count int64) ([]string, error) {
var (
n int64
err error
)
out := make([]string, 0, count)

for n = 0; n < count; n++ {
val, err := queue.redisClient.RPop(queue.readyKey)
if err != nil {
return out, err
}
out = append(out, val)
}

return out, err
}

// Destroy purges and removes the queue from the list of queues
func (queue *redisQueue) Destroy() (readyCount, rejectedCount int64, err error) {
readyCount, err = queue.PurgeReady()
Expand Down
22 changes: 22 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,3 +750,25 @@ func Test_jitteredDuration(t *testing.T) {
assert.GreaterOrEqual(t, int64(110*time.Millisecond), int64(d))
}
}

func TestQueueDrain(t *testing.T) {
connection, err := OpenConnection("drain-connection", "tcp", "localhost:6379", 1, nil)
assert.NoError(t, err)
require.NotNil(t, connection)

queue, err := connection.OpenQueue("drain-queue")
assert.NoError(t, err)

for x := 0; x < 100; x++ {
queue.Publish(fmt.Sprintf("%d", x))
}

eventuallyReady(t, queue, 100)

for x := 1; x <= 10; x++ {
values, err := queue.Drain(10)
assert.NoError(t, err)
assert.Equal(t, 10, len(values))
eventuallyReady(t, queue, int64(100-x*10))
}
}
1 change: 1 addition & 0 deletions redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type RedisClient interface {
LRem(key string, count int64, value string) (affected int64, err error)
LTrim(key string, start, stop int64) error
RPopLPush(source, destination string) (value string, err error)
RPop(key string) (value string, err error)

// sets
SAdd(key, value string) (total int64, err error)
Expand Down
4 changes: 4 additions & 0 deletions redis_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error {
return wrapper.rawClient.LTrim(unusedContext, key, int64(start), int64(stop)).Err()
}

func (wrapper RedisWrapper) RPop(key string) (value string, err error) {
return wrapper.rawClient.RPop(unusedContext, key).Result()
}

func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error) {
value, err = wrapper.rawClient.RPopLPush(unusedContext, source, destination).Result()
// println("RPopLPush", source, destination, value, err)
Expand Down
1 change: 1 addition & 0 deletions test_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (*TestQueue) ReturnRejected(int64) (int64, error) { panic(errorNotSupported
func (*TestQueue) PurgeReady() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) PurgeRejected() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) Destroy() (int64, int64, error) { panic(errorNotSupported) }
func (*TestQueue) Drain(count int64) ([]string, error) { panic(errorNotSupported) }
func (*TestQueue) closeInStaleConnection() error { panic(errorNotSupported) }
func (*TestQueue) readyCount() (int64, error) { panic(errorNotSupported) }
func (*TestQueue) unackedCount() (int64, error) { panic(errorNotSupported) }
Expand Down
25 changes: 24 additions & 1 deletion test_redis_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,30 @@ func (client *TestRedisClient) LPush(key string, value ...string) (total int64,
}

client.storeList(key, append(value, list...))
return int64(len(list)) + 1, nil
return int64(len(list) + len(value)), nil
}

// RPop removes and returns one value from the tail of the list stored at key.
// When key holds a value that is not a list, an error is returned.
func (client *TestRedisClient) RPop(key string) (value string, err error) {

lock.Lock()
defer lock.Unlock()

list, err := client.findList(key)
// not a list
if err != nil {
return "", ErrorNotFound
}
// list is empty
if len(list) == 0 {
return "", ErrorNotFound
}

// Remove the last element of source (tail)
client.storeList(key, list[0:len(list)-1])

return list[len(list)-1], nil
}

// LLen returns the length of the list stored at key.
Expand Down
17 changes: 17 additions & 0 deletions test_redis_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,20 @@ func TestTestRedisClient_LPush(t *testing.T) {
})
}
}

func TestTestRedisClient_RPop(t *testing.T) {
client := NewTestRedisClient()
key := "list-key"

total, err := client.LPush(key, "1", "2", "3")
assert.NoError(t, err)
assert.Equal(t, int64(3), total)

value, err := client.RPop(key)
assert.NoError(t, err)
assert.Equal(t, "3", value)

total, err = client.LLen(key)
assert.NoError(t, err)
assert.Equal(t, int64(2), total)
}

0 comments on commit 2e5c35a

Please sign in to comment.