Skip to content

Commit

Permalink
113 Add support for Redis clusters (#148)
Browse files Browse the repository at this point in the history
* fix using another wrapper to fit redis cluster and redis single node;
fix using {} to replace [] - untested

* update test files with new openConn;

add queue_cluster_test.go

welle: Remove hardcoded connection options.

* Bring back the original OpenConnection() for compatibility

Add new function OpenConnectionWithOptions() for the new approach.

* Allow cluster tests to run against local cluster

* Remove RedisClusterWrapper again

Rename RedisSingleWrapper back to RedisWrapper

* Bring back original OpenConnectionWithRedisClient()

* Add OpenClusterConnection()

To allow opening RMQ connections which use the Redis hash tags {}
instead of []. This is required to make rmq work with Redis clusters.

This commit also reverts the behavior of all other OpenConnection[...]
functions to behave as before by still using [] instead of {}.

This switch is done by using different Redis key templates. For example
instead of

    rmq::connection::{connection}::queue::[{queue}]::consumers

we would use

    rmq::connection::{connection}::queue::{{queue}}::consumers

when using OpenClusterConnection()

* Document OpenClusterConnection() in README

* Use safe accessors in tests

* Update deps

* Create Redis cluster in CI (#150)

---------

Co-authored-by: zhanglei <luckforzhang@vip.qq.com>
Co-authored-by: Viacheslav Poturaev <vearutop@gmail.com>
  • Loading branch information
3 people authored Jul 20, 2023
1 parent 52c05b0 commit 93f1717
Show file tree
Hide file tree
Showing 14 changed files with 1,267 additions and 165 deletions.
33 changes: 12 additions & 21 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,21 @@ env:
COV_GO_VERSION: 1.19.x # Version of Go to collect coverage
TARGET_DELTA_COV: 90 # Target coverage of changed lines, in percents
REDIS_ADDR: "localhost:6379"
REDIS_CLUSTER_ADDR: "localhost:30001,localhost:30002,localhost:30003,localhost:30004,localhost:30005,localhost:30006"
jobs:
test:
strategy:
matrix:
go-version: [ 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x ]
runs-on: ubuntu-latest

services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- name: Install Go stable
- name: Install Go
if: matrix.go-version != 'tip'
uses: actions/setup-go@v3
with:
go-version: ${{ matrix.go-version }}

- name: Install Go tip
if: matrix.go-version == 'tip'
run: |
curl -sL https://storage.googleapis.com/go-build-snap/go/linux-amd64/$(git ls-remote https://github.com/golang/go.git HEAD | awk '{print $1;}').tar.gz -o gotip.tar.gz
ls -lah gotip.tar.gz
mkdir -p ~/sdk/gotip
tar -C ~/sdk/gotip -xzf gotip.tar.gz
~/sdk/gotip/bin/go version
echo "PATH=$HOME/go/bin:$HOME/sdk/gotip/bin/:$PATH" >> $GITHUB_ENV
- name: Checkout code
uses: actions/checkout@v2

Expand All @@ -77,6 +58,16 @@ jobs:
# Use base sha for PR or new commit hash for master/main push in test result key.
key: ${{ runner.os }}-unit-test-coverage-${{ (github.event.pull_request.base.sha != github.event.after) && github.event.pull_request.base.sha || github.event.after }}

- name: Prepare Redis
run: |
sudo apt-get update && sudo apt-get install -y lsb-release curl gpg
curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
sudo apt-get update && sudo apt-get install -y redis
./testdata/create-cluster.sh start
yes yes | ./testdata/create-cluster.sh create
sleep 5
- name: Run test for base code
if: matrix.go-version == env.COV_GO_VERSION && env.RUN_BASE_COVERAGE == 'on' && steps.base-coverage.outputs.cache-hit != 'true' && github.event.pull_request.base.sha != ''
run: |
Expand Down
41 changes: 25 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
[![Build Status](https://github.com/adjust/rmq/workflows/test/badge.svg)](https://github.com/adjust/rmq/actions?query=branch%3Amaster+workflow%3Atest)
[![GoDoc](https://pkg.go.dev/badge/github.com/adjust/rmq)](https://pkg.go.dev/github.com/adjust/rmq)

---

**Note**: We recently updated rmq to expose Redis errors instead of panicking.
This is a major change as almost all functions now return errors. It's
recommended to switch to the latest version `rmq/v4` so rmq won't crash your
services anymore on Redis errors.

If you don't want to upgrade yet, you can continue using `rmq/v2`.

---

## Overview

rmq is short for Redis message queue. It's a message queue system written in Go
and backed by Redis. It's similar to [redismq][redismq], but implemented
independently with a different interface in mind.

[redismq]: https://github.com/adjust/redismq
and backed by Redis.

## Basic Usage

Expand Down Expand Up @@ -49,7 +35,11 @@ It's also possible to access a Redis listening on a Unix socket:
connection, err := rmq.OpenConnection("my service", "unix", "/tmp/redis.sock", 1, errChan)
```

For more flexible setup you can also create your own Redis client:
For more flexible setup you can pass Redis options or create your own Redis client:

```go
connection, err := OpenConnectionWithRedisOptions("my service", redisOptions, errChan)
```

```go
connection, err := OpenConnectionWithRedisClient("my service", redisClient, errChan)
Expand All @@ -63,6 +53,25 @@ the `OpenConnection()` functions rmq will send those background errors to this
channel so you can handle them asynchronously. For more details about this and
handling suggestions see the section about handling background errors below.

#### Connecting to a Redis cluster

In order to connect to a Redis cluster please use `OpenClusterConnection()`:

```go
redisClusterOptions := &redis.ClusterOptions{ /* ... */ }
redisClusterClient := redis.NewClusterClient(redisClusterOptions)
connection, err := OpenClusterConnection("my service", redisClusterClient, errChan)
```

Note that such an rmq cluster connection uses different Redis than rmq connections
opened by `OpenConnection()` or similar. If you have used a Redis instance
with `OpenConnection()` then it is NOT SAFE to reuse that rmq system by connecting
to it via `OpenClusterConnection()`. The cluster state won't be compatible and
this will likely lead to data loss.

If you've previously used `OpenConnection()` or similar you should only consider
using `OpenClusterConnection()` with a fresh Redis cluster.

### Queues

Once we have a connection we can use it to finally access queues. Each queue
Expand Down
25 changes: 6 additions & 19 deletions cleaner_test.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,23 @@
package rmq

import (
"os"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func testRedis(t testing.TB) (addr string, closer func()) {
t.Helper()

if redisAddr, ok := os.LookupEnv("REDIS_ADDR"); ok {
return redisAddr, func() {}
}

mr := miniredis.RunT(t)
return mr.Addr(), mr.Close
}

func TestCleaner(t *testing.T) {
redisAddr, closer := testRedis(t)
redisOptions, closer := testRedis(t)
defer closer()

flushConn, err := OpenConnection("cleaner-flush", "tcp", redisAddr, 1, nil)
flushConn, err := OpenConnectionWithRedisOptions("cleaner-flush", redisOptions, nil)
assert.NoError(t, err)
assert.NoError(t, flushConn.stopHeartbeat())
assert.NoError(t, flushConn.flushDb())

conn, err := OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
conn, err := OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
assert.NoError(t, err)
queues, err := conn.GetOpenQueues()
assert.NoError(t, err)
Expand Down Expand Up @@ -91,7 +78,7 @@ func TestCleaner(t *testing.T) {
assert.NoError(t, conn.stopHeartbeat())
time.Sleep(time.Millisecond)

conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
assert.NoError(t, err)
queue, err = conn.OpenQueue("q1")
assert.NoError(t, err)
Expand Down Expand Up @@ -138,7 +125,7 @@ func TestCleaner(t *testing.T) {
assert.NoError(t, conn.stopHeartbeat())
time.Sleep(time.Millisecond)

cleanerConn, err := OpenConnection("cleaner-conn", "tcp", redisAddr, 1, nil)
cleanerConn, err := OpenConnectionWithRedisOptions("cleaner-conn", redisOptions, nil)
assert.NoError(t, err)
cleaner := NewCleaner(cleanerConn)
returned, err := cleaner.Clean()
Expand All @@ -149,7 +136,7 @@ func TestCleaner(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, queues, 2)

conn, err = OpenConnection("cleaner-conn1", "tcp", redisAddr, 1, nil)
conn, err = OpenConnectionWithRedisOptions("cleaner-conn1", redisOptions, nil)
assert.NoError(t, err)
queue, err = conn.OpenQueue("q1")
assert.NoError(t, err)
Expand Down
66 changes: 49 additions & 17 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ type Connection interface {
// Connection is the entry point. Use a connection to access queues, consumers and deliveries
// Each connection has a single heartbeat shared among all consumers
type redisConnection struct {
Name string
heartbeatKey string // key to keep alive
queuesKey string // key to list of queues consumed by this connection
Name string
heartbeatKey string // key to keep alive
queuesKey string // key to list of queues consumed by this connection

consumersTemplate string
unackedTemplate string
readyTemplate string
rejectedTemplate string

redisClient RedisClient
errChan chan<- error
heartbeatStop chan chan struct{}
Expand All @@ -62,11 +68,16 @@ type redisConnection struct {

// OpenConnection opens and returns a new connection
func OpenConnection(tag string, network string, address string, db int, errChan chan<- error) (Connection, error) {
redisClient := redis.NewClient(&redis.Options{Network: network, Addr: address, DB: db})
return OpenConnectionWithRedisClient(tag, redisClient, errChan)
return OpenConnectionWithRedisOptions(tag, &redis.Options{Network: network, Addr: address, DB: db}, errChan)
}

// OpenConnectionWithRedisOptions allows you to pass more flexible options
func OpenConnectionWithRedisOptions(tag string, redisOption *redis.Options, errChan chan<- error) (Connection, error) {
return OpenConnectionWithRedisClient(tag, redis.NewClient(redisOption), errChan)
}

// OpenConnectionWithRedisClient opens and returns a new connection
// This can be used to passa redis.ClusterClient.
func OpenConnectionWithRedisClient(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) {
return OpenConnectionWithRmqRedisClient(tag, RedisWrapper{redisClient}, errChan)
}
Expand All @@ -77,18 +88,31 @@ func OpenConnectionWithTestRedisClient(tag string, errChan chan<- error) (Connec
return OpenConnectionWithRmqRedisClient(tag, NewTestRedisClient(), errChan)
}

// If you would like to use a redis client other than the ones supported in the constructors above, you can implement
// the RedisClient interface yourself
// OpenConnectionWithRmqRedisClient: If you would like to use a redis client other than the ones
// supported in the constructors above, you can implement the RedisClient interface yourself
func OpenConnectionWithRmqRedisClient(tag string, redisClient RedisClient, errChan chan<- error) (Connection, error) {
return openConnection(tag, redisClient, false, errChan)
}

// OpenClusterConnection: Same as OpenConnectionWithRedisClient, but using Redis hash tags {} instead of [].
func OpenClusterConnection(tag string, redisClient redis.Cmdable, errChan chan<- error) (Connection, error) {
return openConnection(tag, RedisWrapper{redisClient}, true, errChan)
}

func openConnection(tag string, redisClient RedisClient, useRedisHashTags bool, errChan chan<- error) (Connection, error) {
name := fmt.Sprintf("%s-%s", tag, RandomString(6))

connection := &redisConnection{
Name: name,
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
redisClient: redisClient,
errChan: errChan,
heartbeatStop: make(chan chan struct{}, 1),
Name: name,
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
consumersTemplate: getTemplate(connectionQueueConsumersBaseTemplate, useRedisHashTags),
unackedTemplate: getTemplate(connectionQueueUnackedBaseTemplate, useRedisHashTags),
readyTemplate: getTemplate(queueReadyBaseTemplate, useRedisHashTags),
rejectedTemplate: getTemplate(queueRejectedBaseTemplate, useRedisHashTags),
redisClient: redisClient,
errChan: errChan,
heartbeatStop: make(chan chan struct{}, 1),
}

if err := connection.updateHeartbeat(); err != nil { // checks the connection
Expand Down Expand Up @@ -243,10 +267,14 @@ func (connection *redisConnection) getConnections() ([]string, error) {
// hijackConnection reopens an existing connection for inspection purposes without starting a heartbeat
func (connection *redisConnection) hijackConnection(name string) Connection {
return &redisConnection{
Name: name,
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
redisClient: connection.redisClient,
Name: name,
heartbeatKey: strings.Replace(connectionHeartbeatTemplate, phConnection, name, 1),
queuesKey: strings.Replace(connectionQueuesTemplate, phConnection, name, 1),
consumersTemplate: connection.consumersTemplate,
unackedTemplate: connection.unackedTemplate,
readyTemplate: connection.readyTemplate,
rejectedTemplate: connection.rejectedTemplate,
redisClient: connection.redisClient,
}
}

Expand Down Expand Up @@ -280,6 +308,10 @@ func (connection *redisConnection) openQueue(name string) Queue {
name,
connection.Name,
connection.queuesKey,
connection.consumersTemplate,
connection.unackedTemplate,
connection.readyTemplate,
connection.rejectedTemplate,
connection.redisClient,
connection.errChan,
)
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ module github.com/adjust/rmq/v5
go 1.17

require (
github.com/alicebob/miniredis/v2 v2.30.0
github.com/alicebob/miniredis/v2 v2.30.4
github.com/redis/go-redis/v9 v9.0.3
github.com/stretchr/testify v1.7.0
)

require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/yuin/gopher-lua v1.0.0 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.30.0 h1:uA3uhDbCxfO9+DI/DuGeAMr9qI+noVWwGPNTFuKID5M=
github.com/alicebob/miniredis/v2 v2.30.0/go.mod h1:84TWKZlxYkfgMucPBf5SOQBYJceZeQRFIaQgNMiCX6Q=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 h1:uvdUDbHQHO85qeSydJtItA4T55Pw6BtAejd0APRJOCE=
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOSchFS7Eo=
github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
github.com/bsm/gomega v1.26.0 h1:LhQm+AFcgV2M0WyKroMASzAzCAJVpAxQXv4SaI9a69Y=
Expand All @@ -27,9 +28,8 @@ github.com/redis/go-redis/v9 v9.0.3/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDO
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/yuin/gopher-lua v1.0.0 h1:pQCf0LN67Kf7M5u7vRd40A8M1I8IMLrxlqngUJgZ0Ow=
github.com/yuin/gopher-lua v1.0.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
Expand Down
19 changes: 9 additions & 10 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ type redisQueue struct {
connectionName string
queuesKey string // key to list of queues consumed by this connection
consumersKey string // key to set of consumers using this connection
unackedKey string // key to list of currently consuming deliveries
readyKey string // key to list of ready deliveries
rejectedKey string // key to list of rejected deliveries
unackedKey string // key to list of currently consuming deliveries
pushKey string // key to list of pushed deliveries
redisClient RedisClient
errChan chan<- error
Expand All @@ -64,22 +64,21 @@ type redisQueue struct {
}

func newQueue(
name string,
connectionName string,
queuesKey string,
name, connectionName, queuesKey string,
consumersTemplate, unackedTemplate, readyTemplate, rejectedTemplate string,
redisClient RedisClient,
errChan chan<- error,
) *redisQueue {

consumersKey := strings.Replace(connectionQueueConsumersTemplate, phConnection, connectionName, 1)
consumersKey := strings.Replace(consumersTemplate, phConnection, connectionName, 1)
consumersKey = strings.Replace(consumersKey, phQueue, name, 1)

readyKey := strings.Replace(queueReadyTemplate, phQueue, name, 1)
rejectedKey := strings.Replace(queueRejectedTemplate, phQueue, name, 1)

unackedKey := strings.Replace(connectionQueueUnackedTemplate, phConnection, connectionName, 1)
unackedKey := strings.Replace(unackedTemplate, phConnection, connectionName, 1)
unackedKey = strings.Replace(unackedKey, phQueue, name, 1)

readyKey := strings.Replace(readyTemplate, phQueue, name, 1)
rejectedKey := strings.Replace(rejectedTemplate, phQueue, name, 1)

consumingStopped := make(chan struct{})
ackCtx, ackCancel := context.WithCancel(context.Background())

Expand All @@ -88,9 +87,9 @@ func newQueue(
connectionName: connectionName,
queuesKey: queuesKey,
consumersKey: consumersKey,
unackedKey: unackedKey,
readyKey: readyKey,
rejectedKey: rejectedKey,
unackedKey: unackedKey,
redisClient: redisClient,
errChan: errChan,
consumingStopped: consumingStopped,
Expand Down
Loading

0 comments on commit 93f1717

Please sign in to comment.