From 93f17171da879a8e7b84ff959d2684562fab7e47 Mon Sep 17 00:00:00 2001 From: Christian Wellenbrock Date: Thu, 20 Jul 2023 11:50:11 +0200 Subject: [PATCH] 113 Add support for Redis clusters (#148) * fix using another wrapper to fit redis cluster and redis single node; fix using {} to replace [] - untested * update test files with new openConn; add queue_cluster_test.go welle: Remove hardcoded connection options. * Bring back the original OpenConnection() for compatibility Add new function OpenConnectionWithOptions() for the new approach. * Allow cluster tests to run against local cluster * Remove RedisClusterWrapper again Rename RedisSingleWrapper back to RedisWrapper * Bring back original OpenConnectionWithRedisClient() * Add OpenClusterConnection() To allow opening RMQ connections which use the Redis hash tags {} instead of []. This is required to make rmq work with Redis clusters. This commit also reverts the behavior of all other OpenConnection[...] functions to behave as before by still using [] instead of {}. This switch is done by using different Redis key templates. For example instead of rmq::connection::{connection}::queue::[{queue}]::consumers we would use rmq::connection::{connection}::queue::{{queue}}::consumers when using OpenClusterConnection() * Document OpenClusterConnection() in README * Use safe accessors in tests * Update deps * Create Redis cluster in CI (#150) --------- Co-authored-by: zhanglei Co-authored-by: Viacheslav Poturaev --- .github/workflows/test.yml | 33 +- README.md | 41 +- cleaner_test.go | 25 +- connection.go | 66 ++- go.mod | 6 +- go.sum | 12 +- queue.go | 19 +- queue_cluster_test.go | 897 +++++++++++++++++++++++++++++++++++++ queue_test.go | 92 ++-- redis_keys.go | 26 +- redis_wrapper.go | 28 +- stats_test.go | 8 +- test_util.go | 54 ++- testdata/create-cluster.sh | 125 ++++++ 14 files changed, 1267 insertions(+), 165 deletions(-) create mode 100644 queue_cluster_test.go create mode 100755 testdata/create-cluster.sh diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6a196b6..b534286 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,6 +17,7 @@ env: COV_GO_VERSION: 1.19.x # Version of Go to collect coverage TARGET_DELTA_COV: 90 # Target coverage of changed lines, in percents REDIS_ADDR: "localhost:6379" + REDIS_CLUSTER_ADDR: "localhost:30001,localhost:30002,localhost:30003,localhost:30004,localhost:30005,localhost:30006" jobs: test: strategy: @@ -24,33 +25,13 @@ jobs: go-version: [ 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x ] runs-on: ubuntu-latest - services: - redis: - image: redis - options: >- - --health-cmd "redis-cli ping" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - ports: - - 6379:6379 steps: - - name: Install Go stable + - name: Install Go if: matrix.go-version != 'tip' uses: actions/setup-go@v3 with: go-version: ${{ matrix.go-version }} - - name: Install Go tip - if: matrix.go-version == 'tip' - run: | - curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz - ls -lah gotip.tar.gz - mkdir -p ~/sdk/gotip - tar -C ~/sdk/gotip -xzf gotip.tar.gz - ~/sdk/gotip/bin/go version - echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV - - name: Checkout code uses: actions/checkout@v2 @@ -77,6 +58,16 @@ jobs: # Use base sha for PR or new commit hash for master/main push in test result key. key: ${{ runner.os }}-unit-test-coverage-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }} + - name: Prepare Redis + run: | + sudo apt-get update && sudo apt-get install -y lsb-release curl gpg + curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list + sudo apt-get update && sudo apt-get install -y redis + ./testdata/create-cluster.sh start + yes yes | ./testdata/create-cluster.sh create + sleep 5 + - name: Run test for base code if: matrix.go-version == env.COV_GO_VERSION && env.RUN_BASE_COVERAGE == 'on' && steps.base-coverage.outputs.cache-hit != 'true' && github.event.pull_request.base.sha != '' run: | diff --git a/README.md b/README.md index d8fe8a8..1fd029e 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,10 @@ [![Build Status](https://github.com/adjust/rmq/workflows/test/badge.svg)](https://github.com/adjust/rmq/actions?query=branch%3Amaster+workflow%3Atest) [![GoDoc](https://pkg.go.dev/badge/github.com/adjust/rmq)](https://pkg.go.dev/github.com/adjust/rmq) ---- - -**Note**: We recently updated rmq to expose Redis errors instead of panicking. -This is a major change as almost all functions now return errors. It's -recommended to switch to the latest version `rmq/v4` so rmq won't crash your -services anymore on Redis errors. - -If you don't want to upgrade yet, you can continue using `rmq/v2`. - ---- - ## Overview rmq is short for Redis message queue. It's a message queue system written in Go -and backed by Redis. It's similar to [redismq][redismq], but implemented -independently with a different interface in mind. - -[redismq]: https://github.com/adjust/redismq +and backed by Redis. ## Basic Usage @@ -49,7 +35,11 @@ It's also possible to access a Redis listening on a Unix socket: connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan) ``` -For more flexible setup you can also create your own Redis client: +For more flexible setup you can pass Redis options or create your own Redis client: + +```go +connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan) +``` ```go connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan) @@ -63,6 +53,25 @@ the `OpenConnection()` functions rmq will send those background errors to this channel so you can handle them asynchronously. For more details about this and handling suggestions see the section about handling background errors below. +#### Connecting to a Redis cluster + +In order to connect to a Redis cluster please use `OpenClusterConnection()`: + +```go +redisClusterOptions := &redis.ClusterOptions{ /* ... */ } +redisClusterClient := redis.NewClusterClient(redisClusterOptions) +connection, err := OpenClusterConnection("my service", redisClusterClient, errChan) +``` + +Note that such an rmq cluster connection uses different Redis than rmq connections +opened by `OpenConnection()` or similar. If you have used a Redis instance +with `OpenConnection()` then it is NOT SAFE to reuse that rmq system by connecting +to it via `OpenClusterConnection()`. The cluster state won't be compatible and +this will likely lead to data loss. + +If you've previously used `OpenConnection()` or similar you should only consider +using `OpenClusterConnection()` with a fresh Redis cluster. + ### Queues Once we have a connection we can use it to finally access queues. Each queue diff --git a/cleaner_test.go b/cleaner_test.go index d0cc818..dcd64f9 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -1,36 +1,23 @@ package rmq import ( - "os" "testing" "time" - "github.com/alicebob/miniredis/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func testRedis(t testing.TB) (addr string, closer func()) { - t.Helper() - - if redisAddr, ok := os.LookupEnv("REDIS_ADDR"); ok { - return redisAddr, func() {} - } - - mr := miniredis.RunT(t) - return mr.Addr(), mr.Close -} - func TestCleaner(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - flushConn, err := OpenConnection("cleaner-flush", "tcp", redisAddr, 1, nil) + flushConn, err := OpenConnectionWithRedisOptions("cleaner-flush", redisOptions, nil) assert.NoError(t, err) assert.NoError(t, flushConn.stopHeartbeat()) assert.NoError(t, flushConn.flushDb()) - conn, err := OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) + conn, err := OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil) assert.NoError(t, err) queues, err := conn.GetOpenQueues() assert.NoError(t, err) @@ -91,7 +78,7 @@ func TestCleaner(t *testing.T) { assert.NoError(t, conn.stopHeartbeat()) time.Sleep(time.Millisecond) - conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) + conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil) assert.NoError(t, err) queue, err = conn.OpenQueue("q1") assert.NoError(t, err) @@ -138,7 +125,7 @@ func TestCleaner(t *testing.T) { assert.NoError(t, conn.stopHeartbeat()) time.Sleep(time.Millisecond) - cleanerConn, err := OpenConnection("cleaner-conn", "tcp", redisAddr, 1, nil) + cleanerConn, err := OpenConnectionWithRedisOptions("cleaner-conn", redisOptions, nil) assert.NoError(t, err) cleaner := NewCleaner(cleanerConn) returned, err := cleaner.Clean() @@ -149,7 +136,7 @@ func TestCleaner(t *testing.T) { assert.NoError(t, err) assert.Len(t, queues, 2) - conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil) + conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil) assert.NoError(t, err) queue, err = conn.OpenQueue("q1") assert.NoError(t, err) diff --git a/connection.go b/connection.go index 34a571e..055be0f 100644 --- a/connection.go +++ b/connection.go @@ -46,9 +46,15 @@ type Connection interface { // Connection is the entry point. Use a connection to access queues, consumers and deliveries // Each connection has a single heartbeat shared among all consumers type redisConnection struct { - Name string - heartbeatKey string // key to keep alive - queuesKey string // key to list of queues consumed by this connection + Name string + heartbeatKey string // key to keep alive + queuesKey string // key to list of queues consumed by this connection + + consumersTemplate string + unackedTemplate string + readyTemplate string + rejectedTemplate string + redisClient RedisClient errChan chan<- error heartbeatStop chan chan struct{} @@ -62,11 +68,16 @@ type redisConnection struct { // OpenConnection opens and returns a new connection func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error) { - redisClient := redis.NewClient(&redis.Options{Network: network, Addr: address, DB: db}) - return OpenConnectionWithRedisClient(tag, redisClient, errChan) + return OpenConnectionWithRedisOptions(tag, &redis.Options{Network: network, Addr: address, DB: db}, errChan) +} + +// OpenConnectionWithRedisOptions allows you to pass more flexible options +func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error) { + return OpenConnectionWithRedisClient(tag, redis.NewClient(redisOption), errChan) } // OpenConnectionWithRedisClient opens and returns a new connection +// This can be used to passa redis.ClusterClient. func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) { return OpenConnectionWithRmqRedisClient(tag, RedisWrapper{redisClient}, errChan) } @@ -77,18 +88,31 @@ func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connec return OpenConnectionWithRmqRedisClient(tag, NewTestRedisClient(), errChan) } -// If you would like to use a redis client other than the ones supported in the constructors above, you can implement -// the RedisClient interface yourself +// OpenConnectionWithRmqRedisClient: If you would like to use a redis client other than the ones +// supported in the constructors above, you can implement the RedisClient interface yourself func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error) { + return openConnection(tag, redisClient, false, errChan) +} + +// OpenClusterConnection: Same as OpenConnectionWithRedisClient, but using Redis hash tags {} instead of []. +func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) { + return openConnection(tag, RedisWrapper{redisClient}, true, errChan) +} + +func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool, errChan chan<- error) (Connection, error) { name := fmt.Sprintf("%s-%s", tag, RandomString(6)) connection := &redisConnection{ - Name: name, - heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1), - queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1), - redisClient: redisClient, - errChan: errChan, - heartbeatStop: make(chan chan struct{}, 1), + Name: name, + heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1), + queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1), + consumersTemplate: getTemplate(connectionQueueConsumersBaseTemplate, useRedisHashTags), + unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags), + readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags), + rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags), + redisClient: redisClient, + errChan: errChan, + heartbeatStop: make(chan chan struct{}, 1), } if err := connection.updateHeartbeat(); err != nil { // checks the connection @@ -243,10 +267,14 @@ func (connection *redisConnection) getConnections() ([]string, error) { // hijackConnection reopens an existing connection for inspection purposes without starting a heartbeat func (connection *redisConnection) hijackConnection(name string) Connection { return &redisConnection{ - Name: name, - heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1), - queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1), - redisClient: connection.redisClient, + Name: name, + heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1), + queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1), + consumersTemplate: connection.consumersTemplate, + unackedTemplate: connection.unackedTemplate, + readyTemplate: connection.readyTemplate, + rejectedTemplate: connection.rejectedTemplate, + redisClient: connection.redisClient, } } @@ -280,6 +308,10 @@ func (connection *redisConnection) openQueue(name string) Queue { name, connection.Name, connection.queuesKey, + connection.consumersTemplate, + connection.unackedTemplate, + connection.readyTemplate, + connection.rejectedTemplate, connection.redisClient, connection.errChan, ) diff --git a/go.mod b/go.mod index 139aaca..0517eb7 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,19 @@ module github.com/adjust/rmq/v5 go 1.17 require ( - github.com/alicebob/miniredis/v2 v2.30.0 + github.com/alicebob/miniredis/v2 v2.30.4 github.com/redis/go-redis/v9 v9.0.3 github.com/stretchr/testify v1.7.0 ) require ( - github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/yuin/gopher-lua v1.0.0 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 072402d..904701d 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,8 @@ -github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= -github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M= -github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE= +github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo= +github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao= github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w= github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y= @@ -27,9 +28,8 @@ github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDO github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= -github.com/yuin/gopher-lua v1.0.0 h1:pQCf0LN67Kf7M5u7vRd40A8M1I8IMLrxlqngUJgZ0Ow= -github.com/yuin/gopher-lua v1.0.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/queue.go b/queue.go index 57f8518..c246ead 100644 --- a/queue.go +++ b/queue.go @@ -46,9 +46,9 @@ type redisQueue struct { connectionName string queuesKey string // key to list of queues consumed by this connection consumersKey string // key to set of consumers using this connection + unackedKey string // key to list of currently consuming deliveries readyKey string // key to list of ready deliveries rejectedKey string // key to list of rejected deliveries - unackedKey string // key to list of currently consuming deliveries pushKey string // key to list of pushed deliveries redisClient RedisClient errChan chan<- error @@ -64,22 +64,21 @@ type redisQueue struct { } func newQueue( - name string, - connectionName string, - queuesKey string, + name, connectionName, queuesKey string, + consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string, redisClient RedisClient, errChan chan<- error, ) *redisQueue { - consumersKey := strings.Replace(connectionQueueConsumersTemplate, phConnection, connectionName, 1) + consumersKey := strings.Replace(consumersTemplate, phConnection, connectionName, 1) consumersKey = strings.Replace(consumersKey, phQueue, name, 1) - readyKey := strings.Replace(queueReadyTemplate, phQueue, name, 1) - rejectedKey := strings.Replace(queueRejectedTemplate, phQueue, name, 1) - - unackedKey := strings.Replace(connectionQueueUnackedTemplate, phConnection, connectionName, 1) + unackedKey := strings.Replace(unackedTemplate, phConnection, connectionName, 1) unackedKey = strings.Replace(unackedKey, phQueue, name, 1) + readyKey := strings.Replace(readyTemplate, phQueue, name, 1) + rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1) + consumingStopped := make(chan struct{}) ackCtx, ackCancel := context.WithCancel(context.Background()) @@ -88,9 +87,9 @@ func newQueue( connectionName: connectionName, queuesKey: queuesKey, consumersKey: consumersKey, + unackedKey: unackedKey, readyKey: readyKey, rejectedKey: rejectedKey, - unackedKey: unackedKey, redisClient: redisClient, errChan: errChan, consumingStopped: consumingStopped, diff --git a/queue_cluster_test.go b/queue_cluster_test.go new file mode 100644 index 0000000..1ce3de2 --- /dev/null +++ b/queue_cluster_test.go @@ -0,0 +1,897 @@ +package rmq + +import ( + "fmt" + "math" + "strconv" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestClusterConnections(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connFlush, err := OpenClusterConnection("flush", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + assert.NoError(t, connFlush.stopHeartbeat()) + assert.Equal(t, ErrorNotFound, connFlush.stopHeartbeat()) + assert.NoError(t, connFlush.flushDb()) + + connClean, err := OpenClusterConnection("clean", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + require.NotNil(t, connClean) + _, err = NewCleaner(connClean).Clean() + require.NoError(t, err) + connections, err := connClean.getConnections() + connCount := len(connections) + assert.NoError(t, err) + + conn1, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + connections, err = connClean.getConnections() + assert.NoError(t, err) + assert.Len(t, connections, connCount+1) // connection q1 was added + assert.Equal(t, ErrorNotFound, connClean.hijackConnection("nope").checkHeartbeat()) + assert.NoError(t, conn1.checkHeartbeat()) + + conn2, err := OpenClusterConnection("conn2", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + connections, err = connClean.getConnections() + assert.NoError(t, err) + assert.Len(t, connections, connCount+2) // connection q2 was added + assert.NoError(t, conn1.checkHeartbeat()) + assert.NoError(t, conn2.checkHeartbeat()) + + assert.Equal(t, ErrorNotFound, connClean.hijackConnection("nope").stopHeartbeat()) + assert.NoError(t, conn1.stopHeartbeat()) + assert.Equal(t, ErrorNotFound, conn1.checkHeartbeat()) + assert.NoError(t, conn2.checkHeartbeat()) + connections, err = connClean.getConnections() + assert.NoError(t, err) + assert.Len(t, connections, connCount+2) + + assert.NoError(t, conn2.stopHeartbeat()) + assert.Equal(t, ErrorNotFound, conn1.checkHeartbeat()) + assert.Equal(t, ErrorNotFound, conn2.checkHeartbeat()) + connections, err = connClean.getConnections() + assert.NoError(t, err) + assert.Len(t, connections, connCount+2) + + assert.NoError(t, connClean.stopHeartbeat()) +} + +func TestClusterConnectionQueues(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + require.NotNil(t, connection) + + assert.NoError(t, connection.unlistAllQueues()) + queues, err := connection.GetOpenQueues() + assert.NoError(t, err) + assert.Len(t, queues, 0) + + queue1, err := connection.OpenQueue("conn1-q1") + assert.NoError(t, err) + require.NotNil(t, queue1) + queues, err = connection.GetOpenQueues() + assert.NoError(t, err) + assert.Equal(t, []string{"conn1-q1"}, queues) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 0) + assert.NoError(t, queue1.StartConsuming(1, time.Millisecond)) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Equal(t, []string{"conn1-q1"}, queues) + + queue2, err := connection.OpenQueue("conn1-q2") + assert.NoError(t, err) + require.NotNil(t, queue2) + queues, err = connection.GetOpenQueues() + assert.NoError(t, err) + assert.Len(t, queues, 2) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 1) + assert.NoError(t, queue2.StartConsuming(1, time.Millisecond)) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 2) + + <-queue2.StopConsuming() + assert.NoError(t, queue2.closeInStaleConnection()) + queues, err = connection.GetOpenQueues() + assert.NoError(t, err) + assert.Len(t, queues, 2) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Equal(t, []string{"conn1-q1"}, queues) + + <-queue1.StopConsuming() + assert.NoError(t, queue1.closeInStaleConnection()) + queues, err = connection.GetOpenQueues() + assert.NoError(t, err) + assert.Len(t, queues, 2) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 0) + + readyCount, rejectedCount, err := queue1.Destroy() + assert.NoError(t, err) + assert.Equal(t, int64(0), readyCount) + assert.Equal(t, int64(0), rejectedCount) + queues, err = connection.GetOpenQueues() + assert.NoError(t, err) + assert.Equal(t, []string{"conn1-q2"}, queues) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 0) + + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterQueueCommon(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + require.NotNil(t, connection) + + queue, err := connection.OpenQueue("queue-q") + assert.NoError(t, err) + require.NotNil(t, queue) + _, err = queue.PurgeReady() + assert.NoError(t, err) + eventuallyReady(t, queue, 0) + assert.NoError(t, queue.Publish("queue-d1")) + eventuallyReady(t, queue, 1) + assert.NoError(t, queue.Publish("queue-d2")) + eventuallyReady(t, queue, 2) + count, err := queue.PurgeReady() + assert.Equal(t, int64(2), count) + eventuallyReady(t, queue, 0) + count, err = queue.PurgeReady() + assert.Equal(t, int64(0), count) + + queues, err := connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 0) + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) + assert.Equal(t, ErrorAlreadyConsuming, queue.StartConsuming(10, time.Millisecond)) + cons1name, err := queue.AddConsumer("queue-cons1", NewTestConsumer("queue-A")) + assert.NoError(t, err) + time.Sleep(time.Millisecond) + queues, err = connection.getConsumingQueues() + assert.NoError(t, err) + assert.Len(t, queues, 1) + consumers, err := queue.getConsumers() + assert.NoError(t, err) + assert.Equal(t, []string{cons1name}, consumers) + _, err = queue.AddConsumer("queue-cons2", NewTestConsumer("queue-B")) + assert.NoError(t, err) + consumers, err = queue.getConsumers() + assert.NoError(t, err) + assert.Len(t, consumers, 2) + + <-queue.StopConsuming() + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterConsumerCommon(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + require.NotNil(t, connection) + + queue1, err := connection.OpenQueue("cons-q") + assert.NoError(t, err) + require.NotNil(t, queue1) + _, err = queue1.PurgeReady() + assert.NoError(t, err) + + consumer := NewTestConsumer("cons-A") + consumer.AutoAck = false + assert.NoError(t, queue1.StartConsuming(10, time.Millisecond)) + _, err = queue1.AddConsumer("cons-cons", consumer) + assert.NoError(t, err) + assert.Nil(t, consumer.Last()) + + assert.NoError(t, queue1.Publish("cons-d1")) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 1) + require.NotNil(t, consumer.Last()) + assert.Equal(t, "cons-d1", consumer.Last().Payload()) + + assert.NoError(t, queue1.Publish("cons-d2")) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 2) + assert.Equal(t, "cons-d2", consumer.Last().Payload()) + + assert.NoError(t, consumer.Deliveries()[0].Ack()) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 1) + + assert.NoError(t, consumer.Deliveries()[1].Ack()) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 0) + + assert.Equal(t, ErrorNotFound, consumer.Deliveries()[0].Ack()) + + assert.NoError(t, queue1.Publish("cons-d3")) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 1) + eventuallyRejected(t, queue1, 0) + assert.Equal(t, "cons-d3", consumer.Last().Payload()) + assert.NoError(t, consumer.Last().Reject()) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 0) + eventuallyRejected(t, queue1, 1) + + assert.NoError(t, queue1.Publish("cons-d4")) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 1) + eventuallyRejected(t, queue1, 1) + assert.Equal(t, "cons-d4", consumer.Last().Payload()) + assert.NoError(t, consumer.Last().Reject()) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 0) + eventuallyRejected(t, queue1, 2) + count, err := queue1.PurgeRejected() + assert.NoError(t, err) + assert.Equal(t, int64(2), count) + eventuallyRejected(t, queue1, 0) + count, err = queue1.PurgeRejected() + assert.NoError(t, err) + assert.Equal(t, int64(0), count) + + queue2, err := connection.OpenQueue("cons-func-q") + assert.NoError(t, err) + assert.NoError(t, queue2.StartConsuming(10, time.Millisecond)) + + payloadChan := make(chan string, 1) + payload := "cons-func-payload" + + _, err = queue2.AddConsumerFunc("cons-func", func(delivery Delivery) { + err = delivery.Ack() + assert.NoError(t, err) + payloadChan <- delivery.Payload() + }) + assert.NoError(t, err) + + assert.NoError(t, queue2.Publish(payload)) + eventuallyReady(t, queue2, 0) + eventuallyUnacked(t, queue2, 0) + assert.Equal(t, payload, <-payloadChan) + + <-queue1.StopConsuming() + <-queue2.StopConsuming() + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterMulti(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("multi-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + for i := 0; i < 20; i++ { + err := queue.Publish(fmt.Sprintf("multi-d%d", i)) + assert.NoError(t, err) + } + eventuallyReady(t, queue, 20) + eventuallyUnacked(t, queue, 0) + + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) + + // Assert that eventually the ready count drops to 10 and unacked rises to 10 + // TODO use the util funcs instead + assert.Eventually(t, func() bool { + readyCount, err := queue.readyCount() + if err != nil { + return false + } + unackedCount, err := queue.unackedCount() + if err != nil { + return false + } + return readyCount == 10 && unackedCount == 10 + }, 10*time.Second, 2*time.Millisecond) + + consumer := NewTestConsumer("multi-cons") + consumer.AutoAck = false + consumer.AutoFinish = false + + _, err = queue.AddConsumer("multi-cons", consumer) + assert.NoError(t, err) + + // After we add the consumer - ready and unacked do not change + eventuallyReady(t, queue, 10) + eventuallyUnacked(t, queue, 10) + + assert.NoError(t, consumer.Last().Ack()) + // Assert that after the consumer acks a message the ready count drops to 9 and unacked remains at 10 + // TODO use util funcs instead + assert.Eventually(t, func() bool { + readyCount, err := queue.readyCount() + if err != nil { + return false + } + unackedCount, err := queue.unackedCount() + if err != nil { + return false + } + return readyCount == 9 && unackedCount == 10 + }, 10*time.Second, 2*time.Millisecond) + + consumer.Finish() + // Assert that after the consumer finishes processing the first message ready and unacked do not change + eventuallyReady(t, queue, 9) + eventuallyUnacked(t, queue, 10) + + assert.NoError(t, consumer.Last().Ack()) + // Assert that after the consumer acks a message the ready count drops to 8 and unacked remains at 10 + // TODO use the util funcs instead + assert.Eventually(t, func() bool { + readyCount, err := queue.readyCount() + if err != nil { + return false + } + unackedCount, err := queue.unackedCount() + if err != nil { + return false + } + return readyCount == 8 && unackedCount == 10 + }, 10*time.Second, 2*time.Millisecond) + + consumer.Finish() + // Assert that after the consumer finishes processing the second message ready and unacked do not change + eventuallyReady(t, queue, 8) + eventuallyUnacked(t, queue, 10) + + // This prevents the consumer from blocking internally inside a call to Consume, which allows the queue to complete + // the call to StopConsuming + consumer.FinishAll() + + <-queue.StopConsuming() + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterBatch(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("batch-q") + assert.NoError(t, err) + _, err = queue.PurgeRejected() + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + for i := 0; i < 5; i++ { + err := queue.Publish(fmt.Sprintf("batch-d%d", i)) + assert.NoError(t, err) + } + + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) + eventuallyUnacked(t, queue, 5) + + consumer := NewTestBatchConsumer() + _, err = queue.AddBatchConsumer("batch-cons", 2, 50*time.Millisecond, consumer) + assert.NoError(t, err) + assert.Eventually(t, func() bool { + return len(consumer.Last()) == 2 + }, 10*time.Second, 2*time.Millisecond) + assert.Equal(t, "batch-d0", consumer.Last()[0].Payload()) + assert.Equal(t, "batch-d1", consumer.Last()[1].Payload()) + assert.NoError(t, consumer.Last()[0].Reject()) + assert.NoError(t, consumer.Last()[1].Ack()) + eventuallyUnacked(t, queue, 3) + eventuallyRejected(t, queue, 1) + + consumer.Finish() + assert.Eventually(t, func() bool { + return len(consumer.Last()) == 2 + }, 10*time.Second, 2*time.Millisecond) + assert.Equal(t, "batch-d2", consumer.Last()[0].Payload()) + assert.Equal(t, "batch-d3", consumer.Last()[1].Payload()) + assert.NoError(t, consumer.Last()[0].Reject()) + assert.NoError(t, consumer.Last()[1].Ack()) + eventuallyUnacked(t, queue, 1) + eventuallyRejected(t, queue, 2) + + consumer.Finish() + // Last Batch is cleared out + assert.Len(t, consumer.Last(), 0) + eventuallyUnacked(t, queue, 1) + eventuallyRejected(t, queue, 2) + + // After a pause the batch consumer will pull down another batch + assert.Eventually(t, func() bool { + return len(consumer.Last()) == 1 + }, 10*time.Second, 2*time.Millisecond) + assert.Equal(t, "batch-d4", consumer.Last()[0].Payload()) + assert.NoError(t, consumer.Last()[0].Reject()) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 3) +} + +func TestClusterReturnRejected(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("return-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + for i := 0; i < 6; i++ { + err := queue.Publish(fmt.Sprintf("return-d%d", i)) + assert.NoError(t, err) + } + + eventuallyReady(t, queue, 6) + eventuallyUnacked(t, queue, 0) + eventuallyRejected(t, queue, 0) + + assert.NoError(t, queue.StartConsuming(10, time.Millisecond)) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 6) + eventuallyRejected(t, queue, 0) + + consumer := NewTestConsumer("return-cons") + consumer.AutoAck = false + _, err = queue.AddConsumer("cons", consumer) + assert.NoError(t, err) + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 6) + eventuallyRejected(t, queue, 0) + + assert.Len(t, consumer.Deliveries(), 6) + assert.NoError(t, consumer.Deliveries()[0].Reject()) + assert.NoError(t, consumer.Deliveries()[1].Ack()) + assert.NoError(t, consumer.Deliveries()[2].Reject()) + assert.NoError(t, consumer.Deliveries()[3].Reject()) + // delivery 4 still open + assert.NoError(t, consumer.Deliveries()[5].Reject()) + + eventuallyReady(t, queue, 0) + eventuallyUnacked(t, queue, 1) // delivery 4 + eventuallyRejected(t, queue, 4) // delivery 0, 2, 3, 5 + + <-queue.StopConsuming() + + n, err := queue.ReturnRejected(2) + assert.NoError(t, err) + assert.Equal(t, int64(2), n) + eventuallyReady(t, queue, 2) + eventuallyUnacked(t, queue, 1) // delivery 4 + eventuallyRejected(t, queue, 2) // delivery 3, 5 + + n, err = queue.ReturnRejected(math.MaxInt64) + assert.NoError(t, err) + assert.Equal(t, int64(2), n) + eventuallyReady(t, queue, 4) + eventuallyUnacked(t, queue, 1) // delivery 4 + eventuallyRejected(t, queue, 0) +} + +func TestClusterPushQueue(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + // using random queue names here to make test more robust + queueName1 := RandomString(6) + queueName2 := RandomString(6) + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue1, err := connection.OpenQueue(queueName1) + assert.NoError(t, err) + queue2, err := connection.OpenQueue(queueName2) + assert.NoError(t, err) + queue1.SetPushQueue(queue2) + assert.Equal(t, queue2.(*redisQueue).readyKey, queue1.(*redisQueue).pushKey) + + consumer1 := NewTestConsumer("push-cons") + consumer1.AutoAck = false + consumer1.AutoFinish = false + assert.NoError(t, queue1.StartConsuming(10, time.Millisecond)) + _, err = queue1.AddConsumer("push-cons", consumer1) + assert.NoError(t, err) + + consumer2 := NewTestConsumer("push-cons") + consumer2.AutoAck = false + consumer2.AutoFinish = false + assert.NoError(t, queue2.StartConsuming(10, time.Millisecond)) + _, err = queue2.AddConsumer("push-cons", consumer2) + assert.NoError(t, err) + + assert.NoError(t, queue1.Publish("d1")) + eventuallyUnacked(t, queue1, 1) + require.Len(t, consumer1.Deliveries(), 1) + + assert.NoError(t, consumer1.Last().Push()) + eventuallyUnacked(t, queue1, 0) + eventuallyUnacked(t, queue2, 1) + require.Len(t, consumer2.Deliveries(), 1) + + assert.NoError(t, consumer2.Last().Push()) + eventuallyRejected(t, queue2, 1) +} + +func TestClusterStopConsuming_Consumer(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("consume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + deliveryCount := int64(30) + + for i := int64(0); i < deliveryCount; i++ { + err := queue.Publish("d" + strconv.FormatInt(i, 10)) + assert.NoError(t, err) + } + + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) + + var consumers []*TestConsumer + for i := 0; i < 10; i++ { + consumer := NewTestConsumer("c" + strconv.Itoa(i)) + consumers = append(consumers, consumer) + _, err = queue.AddConsumer("consume", consumer) + assert.NoError(t, err) + } + + finishedChan := queue.StopConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + var consumedCount int64 + for i := 0; i < 10; i++ { + consumedCount += int64(len(consumers[i].Deliveries())) + } + + // make sure all deliveries are either ready, unacked or consumed (acked) + assert.Eventually(t, func() bool { + readyCount, err := queue.readyCount() + if err != nil { + return false + } + unackedCount, err := queue.unackedCount() + if err != nil { + return false + } + return readyCount+unackedCount+consumedCount == deliveryCount + }, 10*time.Second, 2*time.Millisecond) + + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterStopConsuming_BatchConsumer(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("batchConsume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + deliveryCount := int64(50) + + for i := int64(0); i < deliveryCount; i++ { + err := queue.Publish("d" + strconv.FormatInt(i, 10)) + assert.NoError(t, err) + } + + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) + + var consumers []*TestBatchConsumer + for i := 0; i < 10; i++ { + consumer := NewTestBatchConsumer() + consumer.AutoFinish = true + consumers = append(consumers, consumer) + _, err = queue.AddBatchConsumer("consume", 5, time.Second, consumer) + assert.NoError(t, err) + } + + finishedChan := queue.StopConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + var consumedCount int64 + for i := 0; i < 10; i++ { + consumedCount += consumers[i].Consumed() + } + + // make sure all deliveries are either ready, unacked or consumed (acked) + assert.Eventually(t, func() bool { + readyCount, err := queue.readyCount() + if err != nil { + return false + } + unackedCount, err := queue.unackedCount() + if err != nil { + return false + } + return readyCount+unackedCount+consumedCount == deliveryCount + }, 10*time.Second, 2*time.Millisecond) + + assert.NoError(t, connection.stopHeartbeat()) +} + +func TestClusterConnection_StopAllConsuming_CantOpenQueue(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + + finishedChan := connection.StopAllConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + queue, err := connection.OpenQueue("consume-q") + require.Nil(t, queue) + require.Equal(t, ErrorConsumingStopped, err) +} + +func TestClusterConnection_StopAllConsuming_CantStartConsuming(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("consume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + finishedChan := connection.StopAllConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + err = queue.StartConsuming(20, time.Millisecond) + require.Equal(t, ErrorConsumingStopped, err) +} + +func TestClusterQueue_StopConsuming_CantStartConsuming(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("consume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + finishedChan := queue.StopConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + err = queue.StartConsuming(20, time.Millisecond) + require.Equal(t, ErrorConsumingStopped, err) +} + +func TestClusterConnection_StopAllConsuming_CantAddConsumer(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("consume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) + + finishedChan := connection.StopAllConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + _, err = queue.AddConsumer("late-consume", NewTestConsumer("late-consumer")) + require.Equal(t, ErrorConsumingStopped, err) +} + +func TestClusterQueue_StopConsuming_CantAddConsumer(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + queue, err := connection.OpenQueue("consume-q") + assert.NoError(t, err) + _, err = queue.PurgeReady() + assert.NoError(t, err) + + assert.NoError(t, queue.StartConsuming(20, time.Millisecond)) + + finishedChan := queue.StopConsuming() + require.NotNil(t, finishedChan) + <-finishedChan // wait for stopping to finish + + _, err = queue.AddConsumer("late-consume", NewTestConsumer("late-consumer")) + require.Equal(t, ErrorConsumingStopped, err) +} + +func BenchmarkClusterQueue(b *testing.B) { + // open queue + redisOptions, closer := testClusterRedis(b) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(b, err) + queueName := fmt.Sprintf("bench-q%d", b.N) + queue, err := connection.OpenQueue(queueName) + assert.NoError(b, err) + assert.NoError(b, queue.StartConsuming(10, time.Millisecond)) + + // add some consumers + numConsumers := 10 + var consumers []*TestConsumer + for i := 0; i < numConsumers; i++ { + consumer := NewTestConsumer("bench-A") + // consumer.SleepDuration = time.Microsecond + consumers = append(consumers, consumer) + _, err = queue.AddConsumer("bench-cons", consumer) + assert.NoError(b, err) + } + + // publish deliveries + for i := 0; i < b.N; i++ { + err := queue.Publish("bench-d") + assert.NoError(b, err) + } + + // wait until all are consumed + for { + ready, err := queue.readyCount() + assert.NoError(b, err) + unacked, err := queue.unackedCount() + assert.NoError(b, err) + fmt.Printf("%d unacked %d %d\n", b.N, ready, unacked) + if ready == 0 && unacked == 0 { + break + } + time.Sleep(time.Millisecond) + } + + time.Sleep(time.Millisecond) + + sum := 0 + for _, consumer := range consumers { + sum += len(consumer.Deliveries()) + } + fmt.Printf("consumed %d\n", sum) + + assert.NoError(b, connection.stopHeartbeat()) +} + +func TestCluster_jitteredDuration(t *testing.T) { + dur := 100 * time.Millisecond + for i := 0; i < 5000; i++ { + d := jitteredDuration(dur) + assert.LessOrEqual(t, int64(90*time.Millisecond), int64(d)) + assert.GreaterOrEqual(t, int64(110*time.Millisecond), int64(d)) + } +} + +func TestClusterQueueDrain(t *testing.T) { + redisOptions, closer := testClusterRedis(t) + defer closer() + + connection, err := OpenClusterConnection("conn1", redis.NewClusterClient(redisOptions), nil) + assert.NoError(t, err) + require.NotNil(t, connection) + + queue, err := connection.OpenQueue("drain-queue") + assert.NoError(t, err) + + for x := 0; x < 100; x++ { + queue.Publish(fmt.Sprintf("%d", x)) + } + + eventuallyReady(t, queue, 100) + + for x := 1; x <= 10; x++ { + values, err := queue.Drain(10) + assert.NoError(t, err) + assert.Equal(t, 10, len(values)) + eventuallyReady(t, queue, int64(100-x*10)) + } +} + +// TestClusterMix tests that you could use both OpenConnection() functions and +// OpenClusterConnection() functions at the same time. Note that these connections should use +// independent redis instances: A single redis and a redis cluster respectively. Don't try to use +// a normal rmq connection and an rmq cluster connection on the same Redis database. This will lead +// to a messed up rmq state. +func TestClusterMix(t *testing.T) { + // a normal rmq connection to a single redis instance + redisOptions1, closer1 := testRedis(t) + defer closer1() + connection1, err := OpenConnectionWithRedisOptions("conn1", redisOptions1, nil) + assert.NoError(t, err) + require.NotNil(t, connection1) + + // an rmq cluster connection to a redis cluster + redisOptions2, closer2 := testClusterRedis(t) + defer closer2() + connection2, err := OpenClusterConnection("conn2", redis.NewClusterClient(redisOptions2), nil) + assert.NoError(t, err) + require.NotNil(t, connection2) + + // publish and consume on the single instance rmq connection + queue1, err := connection1.OpenQueue("cons-q") + assert.NoError(t, err) + require.NotNil(t, queue1) + _, err = queue1.PurgeReady() + assert.NoError(t, err) + + consumer1 := NewTestConsumer("cons-A") + consumer1.AutoAck = false + assert.NoError(t, queue1.StartConsuming(10, time.Millisecond)) + _, err = queue1.AddConsumer("cons-cons", consumer1) + assert.NoError(t, err) + assert.Nil(t, consumer1.Last()) + + assert.NoError(t, queue1.Publish("cons-d1")) + eventuallyReady(t, queue1, 0) + eventuallyUnacked(t, queue1, 1) + require.NotNil(t, consumer1.Last()) + assert.Regexp(t, // using [queue] + `\[cons-d1 rmq::connection::conn1-\w{6}::queue::\[cons-q\]::unacked\]`, + fmt.Sprintf("%s", consumer1.Last()), + ) + + // produce and consume on the rmq cluster connection + queue2, err := connection2.OpenQueue("cons-q") + assert.NoError(t, err) + require.NotNil(t, queue2) + _, err = queue2.PurgeReady() + assert.NoError(t, err) + + consumer2 := NewTestConsumer("cons-A") + consumer2.AutoAck = false + assert.NoError(t, queue2.StartConsuming(20, time.Millisecond)) + _, err = queue2.AddConsumer("cons-cons", consumer2) + assert.NoError(t, err) + assert.Nil(t, consumer2.Last()) + + assert.NoError(t, queue2.Publish("cons-d2")) + eventuallyReady(t, queue2, 0) + eventuallyUnacked(t, queue2, 1) + require.NotNil(t, consumer2.Last()) + assert.Regexp(t, // using {queue} + `\[cons-d2 rmq::connection::conn2-\w{6}::queue::\{cons-q\}::unacked\]`, + fmt.Sprintf("%s", consumer2.Last()), + ) +} diff --git a/queue_test.go b/queue_test.go index 8abf993..72f5507 100644 --- a/queue_test.go +++ b/queue_test.go @@ -9,21 +9,22 @@ import ( "testing" "time" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestConnections(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - flushConn, err := OpenConnection("conns-flush", "tcp", redisAddr, 1, nil) + flushConn, err := OpenConnectionWithRedisOptions("conns-flush", redisOptions, nil) assert.NoError(t, err) assert.NoError(t, flushConn.stopHeartbeat()) assert.Equal(t, ErrorNotFound, flushConn.stopHeartbeat()) assert.NoError(t, flushConn.flushDb()) - connection, err := OpenConnection("conns-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("conns-conn", redisOptions, nil) assert.NoError(t, err) require.NotNil(t, connection) _, err = NewCleaner(connection).Clean() @@ -33,14 +34,14 @@ func TestConnections(t *testing.T) { assert.NoError(t, err) assert.Len(t, connections, 1) // cleaner connection remains - conn1, err := OpenConnection("conns-conn1", "tcp", redisAddr, 1, nil) + conn1, err := OpenConnectionWithRedisOptions("conns-conn1", redisOptions, nil) assert.NoError(t, err) connections, err = connection.getConnections() assert.NoError(t, err) assert.Len(t, connections, 2) assert.Equal(t, ErrorNotFound, connection.hijackConnection("nope").checkHeartbeat()) assert.NoError(t, conn1.checkHeartbeat()) - conn2, err := OpenConnection("conns-conn2", "tcp", redisAddr, 1, nil) + conn2, err := OpenConnectionWithRedisOptions("conns-conn2", redisOptions, nil) assert.NoError(t, err) connections, err = connection.getConnections() assert.NoError(t, err) @@ -67,10 +68,10 @@ func TestConnections(t *testing.T) { } func TestConnectionQueues(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("conn-q-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("conn-q-conn", redisOptions, nil) assert.NoError(t, err) require.NotNil(t, connection) @@ -140,10 +141,10 @@ func TestConnectionQueues(t *testing.T) { } func TestQueueCommon(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("queue-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("queue-conn", redisOptions, nil) assert.NoError(t, err) require.NotNil(t, connection) @@ -188,10 +189,13 @@ func TestQueueCommon(t *testing.T) { } func TestConsumerCommon(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("cons-conn", "tcp", redisAddr, 1, nil) + // Note that we're using OpenClusterConnection with redis.NewClient (not redis.NewClusterClient). + // This is just like using OpenConnection, but just using the Redis hash tags {} instead of []. + // This is possible, but not really an expected use case. + connection, err := OpenClusterConnection("cons-conn", redis.NewClient(redisOptions), nil) assert.NoError(t, err) require.NotNil(t, connection) @@ -221,8 +225,10 @@ func TestConsumerCommon(t *testing.T) { assert.Equal(t, "cons-d2", consumer.Last().Payload()) assert.Equal(t, http.Header{"foo": []string{"bar2"}}, consumer.Last().(WithHeader).Header()) - assert.Regexp(t, `\[cons-d2 rmq::connection::cons-conn-\w{6}::queue::\[cons-q\]::unacked\]`, - fmt.Sprintf("%s", consumer.Last())) + assert.Regexp(t, // using {queue} + `\[cons-d2 rmq::connection::cons-conn-\w{6}::queue::\{cons-q\}::unacked\]`, + fmt.Sprintf("%s", consumer.Last()), + ) assert.NoError(t, consumer.Deliveries()[0].Ack()) eventuallyReady(t, queue1, 0) @@ -287,10 +293,10 @@ func TestConsumerCommon(t *testing.T) { } func TestMulti(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("multi-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("multi-conn", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("multi-q") assert.NoError(t, err) @@ -381,10 +387,10 @@ func TestMulti(t *testing.T) { } func TestBatch(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("batch-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("batch-conn", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("batch-q") assert.NoError(t, err) @@ -452,10 +458,10 @@ func TestBatch(t *testing.T) { } func TestReturnRejected(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("return-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("return-conn", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("return-q") assert.NoError(t, err) @@ -514,10 +520,10 @@ func TestReturnRejected(t *testing.T) { } func TestRejectFaultyMessages(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("faulty-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("faulty-conn", redisOptions, nil) require.NoError(t, err) queue, err := connection.OpenQueue("faulty-q") require.NoError(t, err) @@ -554,10 +560,10 @@ func TestRejectFaultyMessages(t *testing.T) { } func TestPushQueue(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("push", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("push", redisOptions, nil) assert.NoError(t, err) queue1, err := connection.OpenQueue("queue1") assert.NoError(t, err) @@ -594,10 +600,10 @@ func TestPushQueue(t *testing.T) { } func TestStopConsuming_Consumer(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("consume-q") assert.NoError(t, err) @@ -647,10 +653,10 @@ func TestStopConsuming_Consumer(t *testing.T) { } func TestStopConsuming_BatchConsumer(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("batchConsume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("batchConsume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("batchConsume-q") assert.NoError(t, err) @@ -701,10 +707,10 @@ func TestStopConsuming_BatchConsumer(t *testing.T) { } func TestConnection_StopAllConsuming_CantOpenQueue(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) finishedChan := connection.StopAllConsuming() @@ -717,10 +723,10 @@ func TestConnection_StopAllConsuming_CantOpenQueue(t *testing.T) { } func TestConnection_StopAllConsuming_CantStartConsuming(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("consume-q") assert.NoError(t, err) @@ -736,10 +742,10 @@ func TestConnection_StopAllConsuming_CantStartConsuming(t *testing.T) { } func TestQueue_StopConsuming_CantStartConsuming(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("consume-q") assert.NoError(t, err) @@ -755,10 +761,10 @@ func TestQueue_StopConsuming_CantStartConsuming(t *testing.T) { } func TestConnection_StopAllConsuming_CantAddConsumer(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("consume-q") assert.NoError(t, err) @@ -776,10 +782,10 @@ func TestConnection_StopAllConsuming_CantAddConsumer(t *testing.T) { } func TestQueue_StopConsuming_CantAddConsumer(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("consume", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("consume", redisOptions, nil) assert.NoError(t, err) queue, err := connection.OpenQueue("consume-q") assert.NoError(t, err) @@ -797,11 +803,11 @@ func TestQueue_StopConsuming_CantAddConsumer(t *testing.T) { } func BenchmarkQueue(b *testing.B) { - redisAddr, closer := testRedis(b) + redisOptions, closer := testRedis(b) defer closer() // open queue - connection, err := OpenConnection("bench-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("bench-conn", redisOptions, nil) assert.NoError(b, err) queueName := fmt.Sprintf("bench-q%d", b.N) queue, err := connection.OpenQueue(queueName) @@ -868,10 +874,10 @@ func Test_jitteredDuration(t *testing.T) { } func TestQueueDrain(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("drain-connection", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("drain-connection", redisOptions, nil) assert.NoError(t, err) require.NotNil(t, connection) @@ -893,10 +899,10 @@ func TestQueueDrain(t *testing.T) { } func TestQueueHeader(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("queue-h-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("queue-h-conn", redisOptions, nil) assert.NoError(t, err) require.NotNil(t, connection) diff --git a/redis_keys.go b/redis_keys.go index 315a9f2..db4e782 100644 --- a/redis_keys.go +++ b/redis_keys.go @@ -1,17 +1,27 @@ package rmq +import "strings" + const ( - connectionsKey = "rmq::connections" // Set of connection names - connectionHeartbeatTemplate = "rmq::connection::{connection}::heartbeat" // expires after {connection} died - connectionQueuesTemplate = "rmq::connection::{connection}::queues" // Set of queues consumers of {connection} are consuming - connectionQueueConsumersTemplate = "rmq::connection::{connection}::queue::[{queue}]::consumers" // Set of all consumers from {connection} consuming from {queue} - connectionQueueUnackedTemplate = "rmq::connection::{connection}::queue::[{queue}]::unacked" // List of deliveries consumers of {connection} are currently consuming + connectionsKey = "rmq::connections" // Set of connection names + connectionHeartbeatTemplate = "rmq::connection::{connection}::heartbeat" // expires after {connection} died + connectionQueuesTemplate = "rmq::connection::{connection}::queues" // Set of queues consumers of {connection} are consuming + connectionQueueConsumersBaseTemplate = "rmq::connection::{connection}::queue::[{queue}]::consumers" // Set of all consumers from {connection} consuming from {queue} + connectionQueueUnackedBaseTemplate = "rmq::connection::{connection}::queue::[{queue}]::unacked" // List of deliveries consumers of {connection} are currently consuming - queuesKey = "rmq::queues" // Set of all open queues - queueReadyTemplate = "rmq::queue::[{queue}]::ready" // List of deliveries in that {queue} (right is first and oldest, left is last and youngest) - queueRejectedTemplate = "rmq::queue::[{queue}]::rejected" // List of rejected deliveries from that {queue} + queuesKey = "rmq::queues" // Set of all open queues + queueReadyBaseTemplate = "rmq::queue::[{queue}]::ready" // List of deliveries in that {queue} (right is first and oldest, left is last and youngest) + queueRejectedBaseTemplate = "rmq::queue::[{queue}]::rejected" // List of rejected deliveries from that {queue} phConnection = "{connection}" // connection name phQueue = "{queue}" // queue name phConsumer = "{consumer}" // consumer name (consisting of tag and token) ) + +func getTemplate(baseTemplate string, useRedisHashTags bool) string { + if !useRedisHashTags { + return baseTemplate + } + + return strings.Replace(baseTemplate, "[{queue}]", "{{queue}}", 1) +} diff --git a/redis_wrapper.go b/redis_wrapper.go index 551452b..9550d58 100644 --- a/redis_wrapper.go +++ b/redis_wrapper.go @@ -7,48 +7,46 @@ import ( "github.com/redis/go-redis/v9" ) -var unusedContext = context.TODO() - type RedisWrapper struct { rawClient redis.Cmdable } func (wrapper RedisWrapper) Set(key string, value string, expiration time.Duration) error { // NOTE: using Err() here because Result() string is always "OK" - return wrapper.rawClient.Set(unusedContext, key, value, expiration).Err() + return wrapper.rawClient.Set(context.TODO(), key, value, expiration).Err() } func (wrapper RedisWrapper) Del(key string) (affected int64, err error) { - return wrapper.rawClient.Del(unusedContext, key).Result() + return wrapper.rawClient.Del(context.TODO(), key).Result() } func (wrapper RedisWrapper) TTL(key string) (ttl time.Duration, err error) { - return wrapper.rawClient.TTL(unusedContext, key).Result() + return wrapper.rawClient.TTL(context.TODO(), key).Result() } func (wrapper RedisWrapper) LPush(key string, value ...string) (total int64, err error) { - return wrapper.rawClient.LPush(unusedContext, key, value).Result() + return wrapper.rawClient.LPush(context.TODO(), key, value).Result() } func (wrapper RedisWrapper) LLen(key string) (affected int64, err error) { - return wrapper.rawClient.LLen(unusedContext, key).Result() + return wrapper.rawClient.LLen(context.TODO(), key).Result() } func (wrapper RedisWrapper) LRem(key string, count int64, value string) (affected int64, err error) { - return wrapper.rawClient.LRem(unusedContext, key, int64(count), value).Result() + return wrapper.rawClient.LRem(context.TODO(), key, int64(count), value).Result() } func (wrapper RedisWrapper) LTrim(key string, start, stop int64) error { // NOTE: using Err() here because Result() string is always "OK" - return wrapper.rawClient.LTrim(unusedContext, key, int64(start), int64(stop)).Err() + return wrapper.rawClient.LTrim(context.TODO(), key, int64(start), int64(stop)).Err() } func (wrapper RedisWrapper) RPop(key string) (value string, err error) { - return wrapper.rawClient.RPop(unusedContext, key).Result() + return wrapper.rawClient.RPop(context.TODO(), key).Result() } func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, err error) { - value, err = wrapper.rawClient.RPopLPush(unusedContext, source, destination).Result() + value, err = wrapper.rawClient.RPopLPush(context.TODO(), source, destination).Result() // println("RPopLPush", source, destination, value, err) switch err { case nil: @@ -61,18 +59,18 @@ func (wrapper RedisWrapper) RPopLPush(source, destination string) (value string, } func (wrapper RedisWrapper) SAdd(key, value string) (total int64, err error) { - return wrapper.rawClient.SAdd(unusedContext, key, value).Result() + return wrapper.rawClient.SAdd(context.TODO(), key, value).Result() } func (wrapper RedisWrapper) SMembers(key string) (members []string, err error) { - return wrapper.rawClient.SMembers(unusedContext, key).Result() + return wrapper.rawClient.SMembers(context.TODO(), key).Result() } func (wrapper RedisWrapper) SRem(key, value string) (affected int64, err error) { - return wrapper.rawClient.SRem(unusedContext, key, value).Result() + return wrapper.rawClient.SRem(context.TODO(), key, value).Result() } func (wrapper RedisWrapper) FlushDb() error { // NOTE: using Err() here because Result() string is always "OK" - return wrapper.rawClient.FlushDB(unusedContext).Err() + return wrapper.rawClient.FlushDB(context.TODO()).Err() } diff --git a/stats_test.go b/stats_test.go index 98eb333..d08bb6e 100644 --- a/stats_test.go +++ b/stats_test.go @@ -9,17 +9,17 @@ import ( ) func TestStats(t *testing.T) { - redisAddr, closer := testRedis(t) + redisOptions, closer := testRedis(t) defer closer() - connection, err := OpenConnection("stats-conn", "tcp", redisAddr, 1, nil) + connection, err := OpenConnectionWithRedisOptions("stats-conn", redisOptions, nil) assert.NoError(t, err) _, err = NewCleaner(connection).Clean() require.NoError(t, err) - conn1, err := OpenConnection("stats-conn1", "tcp", redisAddr, 1, nil) + conn1, err := OpenConnectionWithRedisOptions("stats-conn1", redisOptions, nil) assert.NoError(t, err) - conn2, err := OpenConnection("stats-conn2", "tcp", redisAddr, 1, nil) + conn2, err := OpenConnectionWithRedisOptions("stats-conn2", redisOptions, nil) assert.NoError(t, err) q1, err := conn2.OpenQueue("stats-q1") assert.NoError(t, err) diff --git a/test_util.go b/test_util.go index 7da61d7..e1eb740 100644 --- a/test_util.go +++ b/test_util.go @@ -1,12 +1,60 @@ package rmq import ( + "os" + "strings" "testing" "time" + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" ) +func testRedis(t testing.TB) (options *redis.Options, close func()) { + t.Helper() + + if redisAddr, ok := os.LookupEnv("REDIS_ADDR"); ok { + return &redis.Options{Addr: redisAddr}, func() {} + } + + mr := miniredis.RunT(t) + return &redis.Options{Addr: mr.Addr()}, mr.Close +} + +func testClusterRedis(t testing.TB) (options *redis.ClusterOptions, close func()) { + t.Helper() + + // Follow these steps to set up a local redis cluster: + // https://github.com/redis/redis/tree/unstable/utils/create-cluster + // Then run the tests like this: + // REDIS_CLUSTER_ADDR="localhost:30001,localhost:30002,localhost:30003,localhost:30004,localhost:30005,localhost:30006" go test + if redisAddrs, ok := os.LookupEnv("REDIS_CLUSTER_ADDR"); ok { + addrs := strings.Split(redisAddrs, ",") + return &redis.ClusterOptions{Addrs: addrs}, func() {} + } + + mr1 := miniredis.RunT(t) + mr2 := miniredis.RunT(t) + mr3 := miniredis.RunT(t) + + options = &redis.ClusterOptions{ + Addrs: []string{ + mr1.Addr(), + mr2.Addr(), + mr3.Addr(), + }, + } + + closeFunc := func() { + mr1.Close() + mr2.Close() + mr3.Close() + } + + return options, closeFunc +} + func eventuallyReady(t *testing.T, queue Queue, expectedReady int64) { t.Helper() assert.Eventually(t, func() bool { @@ -15,7 +63,7 @@ func eventuallyReady(t *testing.T, queue Queue, expectedReady int64) { return false } return count == expectedReady - }, 10*time.Second, 2*time.Millisecond) + }, 1*time.Second, 2*time.Millisecond) } func eventuallyUnacked(t *testing.T, queue Queue, expectedUnacked int64) { @@ -26,7 +74,7 @@ func eventuallyUnacked(t *testing.T, queue Queue, expectedUnacked int64) { return false } return count == expectedUnacked - }, 10*time.Second, 2*time.Millisecond) + }, 1*time.Second, 2*time.Millisecond) } func eventuallyRejected(t *testing.T, queue Queue, expectedRejected int64) { @@ -37,5 +85,5 @@ func eventuallyRejected(t *testing.T, queue Queue, expectedRejected int64) { return false } return count == expectedRejected - }, 10*time.Second, 2*time.Millisecond) + }, 1*time.Second, 2*time.Millisecond) } diff --git a/testdata/create-cluster.sh b/testdata/create-cluster.sh new file mode 100755 index 0000000..819140c --- /dev/null +++ b/testdata/create-cluster.sh @@ -0,0 +1,125 @@ +#!/bin/bash + +SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" + +# Settings +BIN_PATH="/usr/bin/" +CLUSTER_HOST=127.0.0.1 +PORT=30000 +TIMEOUT=2000 +NODES=6 +REPLICAS=1 +PROTECTED_MODE=yes +ADDITIONAL_OPTIONS="" + +# You may want to put the above config parameters into config.sh in order to +# override the defaults without modifying this script. + +if [ -a config.sh ] +then + source "config.sh" +fi + +# Computed vars +ENDPORT=$((PORT+NODES)) + +if [ "$1" == "start" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Starting $PORT" + $BIN_PATH/redis-server --port $PORT --protected-mode $PROTECTED_MODE --cluster-enabled yes --cluster-config-file nodes-${PORT}.conf --cluster-node-timeout $TIMEOUT --appendonly yes --appendfilename appendonly-${PORT}.aof --appenddirname appendonlydir-${PORT} --dbfilename dump-${PORT}.rdb --logfile ${PORT}.log --daemonize yes ${ADDITIONAL_OPTIONS} + done + exit 0 +fi + +if [ "$1" == "create" ] +then + HOSTS="" + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + HOSTS="$HOSTS $CLUSTER_HOST:$PORT" + done + OPT_ARG="" + if [ "$2" == "-f" ]; then + OPT_ARG="--cluster-yes" + fi + $BIN_PATH/redis-cli --cluster create $HOSTS --cluster-replicas $REPLICAS $OPT_ARG + exit 0 +fi + +if [ "$1" == "stop" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + echo "Stopping $PORT" + $BIN_PATH/redis-cli -p $PORT shutdown nosave + done + exit 0 +fi + +if [ "$1" == "watch" ] +then + PORT=$((PORT+1)) + while [ 1 ]; do + clear + date + $BIN_PATH/redis-cli -p $PORT cluster nodes | head -30 + sleep 1 + done + exit 0 +fi + +if [ "$1" == "tail" ] +then + INSTANCE=$2 + PORT=$((PORT+INSTANCE)) + tail -f ${PORT}.log + exit 0 +fi + +if [ "$1" == "tailall" ] +then + tail -f *.log + exit 0 +fi + +if [ "$1" == "call" ] +then + while [ $((PORT < ENDPORT)) != "0" ]; do + PORT=$((PORT+1)) + $BIN_PATH/redis-cli -p $PORT $2 $3 $4 $5 $6 $7 $8 $9 + done + exit 0 +fi + +if [ "$1" == "clean" ] +then + echo "Cleaning *.log" + rm -rf *.log + echo "Cleaning appendonlydir-*" + rm -rf appendonlydir-* + echo "Cleaning dump-*.rdb" + rm -rf dump-*.rdb + echo "Cleaning nodes-*.conf" + rm -rf nodes-*.conf + exit 0 +fi + +if [ "$1" == "clean-logs" ] +then + echo "Cleaning *.log" + rm -rf *.log + exit 0 +fi + +echo "Usage: $0 [start|create|stop|watch|tail|tailall|clean|clean-logs|call]" +echo "start -- Launch Redis Cluster instances." +echo "create [-f] -- Create a cluster using redis-cli --cluster create." +echo "stop -- Stop Redis Cluster instances." +echo "watch -- Show CLUSTER NODES output (first 30 lines) of first node." +echo "tail -- Run tail -f of instance at base port + ID." +echo "tailall -- Run tail -f for all the log files at once." +echo "clean -- Remove all instances data, logs, configs." +echo "clean-logs -- Remove just instances logs." +echo "call -- Call a command (up to 7 arguments) on all nodes."