Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module github.com/minghsu0107/watermill-redistream
module github.com/chris576/watermill-redispubsub

go 1.15

require (
github.com/Rican7/retry v0.3.1
github.com/ThreeDotsLabs/watermill v1.1.1
github.com/go-redis/redis/v8 v8.11.4
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.14.0
github.com/renstrom/shortuuid v3.0.0+incompatible
github.com/stretchr/testify v1.7.0
github.com/vmihailenco/msgpack v4.0.4+incompatible
google.golang.org/appengine v1.6.8 // indirect
)
93 changes: 28 additions & 65 deletions go.sum

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions pkg/redispubsub/marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package redispubsub

import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/vmihailenco/msgpack"
)

const UUIDHeaderKey = "_watermill_message_uuid"

type Marshaller interface {
Marshal(topic string, msg *message.Message) ([]byte, error)
}

type Unmarshaller interface {
Unmarshal(rmsg *redis.Message) (*message.Message, error)
}

type MarshalerUnmarshaler interface {
Marshaller
Unmarshaller
}

type DefaultMarshaller struct{}

type redisEnvelope struct {
UUID string `msgpack:"uuid"`
Metadata message.Metadata `msgpack:"metadata,omitempty"`
Payload []byte `msgpack:"payload"`
}

// Marshal encodes a Watermill message into MsgPack
func (DefaultMarshaller) Marshal(_ string, msg *message.Message) ([]byte, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}

env := redisEnvelope{
UUID: msg.UUID,
Metadata: msg.Metadata,
Payload: msg.Payload,
}

b, err := msgpack.Marshal(env)
if err != nil {
return nil, errors.Wrap(err, "marshal to MsgPack failed")
}

return b, nil
}

// Unmarshal decodes a MsgPack payload into a Watermill message
func (DefaultMarshaller) Unmarshal(rmsg *redis.Message) (*message.Message, error) {
var env redisEnvelope
if err := msgpack.Unmarshal([]byte(rmsg.Payload), &env); err != nil {
return nil, errors.Wrap(err, "unmarshal MsgPack failed")
}

msg := message.NewMessage(env.UUID, env.Payload)
if env.Metadata != nil {
msg.Metadata = env.Metadata
}

return msg, nil
}
72 changes: 72 additions & 0 deletions pkg/redispubsub/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package redispubsub

import (
"context"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
)

// Publisher publishes messages to Redis Pub/Sub channels.
type Publisher struct {
ctx context.Context
rc redis.UniversalClient
marshaller Marshaller
logger watermill.LoggerAdapter
closed bool
}

// NewPublisher creates a new Redis Pub/Sub publisher.
func NewPublisher(ctx context.Context, rc redis.UniversalClient, marshaller Marshaller, logger watermill.LoggerAdapter) (message.Publisher, error) {
if logger == nil {
logger = &watermill.NopLogger{}
}
return &Publisher{
ctx: ctx,
rc: rc,
marshaller: marshaller,
logger: logger,
closed: false,
}, nil
}

// Publish sends messages to a Redis Pub/Sub channel.
// Blocking call: waits for Redis PUBLISH response.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}

logFields := watermill.LogFields{"topic": topic}

for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Publishing message to Redis Pub/Sub", logFields)

payload, err := p.marshaller.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}

// Redis Pub/Sub Publish
cmd := p.rc.Publish(p.ctx, topic, payload)
count, err := cmd.Result()
if err != nil {
return errors.Wrapf(err, "cannot publish message %s", msg.UUID)
}

logFields["subscribers_received"] = count
p.logger.Trace("Message published to Redis Pub/Sub", logFields)
}

return nil
}

// Close marks the publisher as closed. Does NOT close the Redis client.
func (p *Publisher) Close() error {
p.closed = true
p.logger.Debug("Redis Pub/Sub publisher closed", nil)
return nil
}
44 changes: 44 additions & 0 deletions pkg/redispubsub/pubsub_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redispubsub

import (
"context"
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
)

func BenchmarkSubscriber(b *testing.B) {

ctx := context.Background()
rc, err := redisClient(ctx)
if err != nil {
b.Fatal(err)
}
tests.BenchSubscriber(b, func(n int) (message.Publisher, message.Subscriber) {
logger := watermill.NewStdLogger(true, true)

publisher, err := NewPublisher(ctx, rc, &DefaultMarshaller{}, logger)
if err != nil {
panic(err)
}

subscriber, err := NewSubscriber(
ctx,
rc,
&DefaultMarshaller{},
logger,
1,
)
if err != nil {
panic(err)
}

// Warten, damit Subscriber bereit ist (wichtig für Redis Pub/Sub)
time.Sleep(100 * time.Millisecond)

return publisher, subscriber
})
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//go:build stress
// +build stress

package redis
package nonpersistent

import (
"testing"
Expand Down
130 changes: 130 additions & 0 deletions pkg/redispubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package redispubsub

import (
"context"
"strconv"
"testing"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
"github.com/renstrom/shortuuid"
"github.com/stretchr/testify/require"
)

// Redis client singleton
var client redis.UniversalClient

func redisClient(ctx context.Context) (redis.UniversalClient, error) {
if client == nil {
client = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
DB: 0,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MinIdleConns: 10,
})
if err := client.Ping(ctx).Err(); err != nil {
return nil, errors.Wrap(err, "redis connect fail")
}
}
return client, nil
}

// helper function to create publisher and subscriber
func newPubSub(t *testing.T, marshaler MarshalerUnmarshaler) (message.Publisher, message.Subscriber) {
logger := watermill.NewStdLogger(true, true)
ctx := context.Background()

rc, err := redisClient(ctx)
require.NoError(t, err)

publisher, err := NewPublisher(ctx, rc, marshaler, logger)
require.NoError(t, err)

subscriber, err := NewSubscriber(ctx, rc, marshaler, logger, 1)
require.NoError(t, err)

return publisher, subscriber
}

func TestPublishSubscribe(t *testing.T) {
publisher, subscriber := newPubSub(t, &DefaultMarshaller{})
topic := "test-pubsub-topic"
msgCount := 20

messages, err := subscriber.Subscribe(context.Background(), topic)
require.NoError(t, err)

time.Sleep(200 * time.Millisecond) // Subscriber sicher aktiv

for i := 0; i < msgCount; i++ {
payload := []byte("msg-" + strconv.Itoa(i))
require.NoError(t, publisher.Publish(topic, message.NewMessage(shortuuid.New(), payload)))
}

for i := 0; i < msgCount; i++ {
msg := <-messages
require.NotNil(t, msg)

expected := "msg-" + strconv.Itoa(i)
require.Equal(t, expected, string(msg.Payload))

msg.Ack()
}

require.NoError(t, subscriber.Close())
}

func TestFanOut(t *testing.T) {
publisher, _ := newPubSub(t, &DefaultMarshaller{})

topic := "test-pubsub-fanout"
subscriberCount := 2
msgCount := 20

ctx := context.Background()
rc, _ := redisClient(ctx)

subscribers := make([]message.Subscriber, subscriberCount)
messagesCh := make([]<-chan *message.Message, subscriberCount)

// Subscriber starten
for i := 0; i < subscriberCount; i++ {
s, err := NewSubscriber(ctx, rc, &DefaultMarshaller{}, watermill.NewStdLogger(true, false), 1)
require.NoError(t, err)
subscribers[i] = s

ch, err := s.Subscribe(ctx, topic)
require.NoError(t, err)
messagesCh[i] = ch
}

// kurz warten, dass Subscriber bereit sind
time.Sleep(500 * time.Millisecond)

// Publisher Nachrichten senden (direkt Binär-Payload)
for i := 0; i < msgCount; i++ {
rawPayload := "fanout-msg-" + strconv.Itoa(i)
payload := []byte(rawPayload)
require.NoError(t, publisher.Publish(topic, message.NewMessage(shortuuid.New(), payload)))
}
require.NoError(t, publisher.Close())

// Nachrichten prüfen
for i := 0; i < subscriberCount; i++ {
for j := 0; j < msgCount; j++ {
msg := <-messagesCh[i]
require.NotNil(t, msg)

expected := "fanout-msg-" + strconv.Itoa(j)
require.Equal(t, expected, string(msg.Payload))
t.Logf("Subscriber %d received: %s", i+1, string(msg.Payload))

msg.Ack()
}
require.NoError(t, subscribers[i].Close())
}
}
Loading