Redis driver for https://godoc.org/gocloud.dev/pubsub package.
A great alternative to using Kafka, with the ability to quickly switch to it. You can use this driver for MVP, local, small or medium projects. When your project grows you can simply switch to another driver from https://pkg.go.dev/gocloud.dev/pubsub#section-directories.
Using Redis Streams, this driver supports at-least-once
delivery.
The driver uses these Redis commands:
- XADD
- XGROUP CREATE
- XREADGROUP (with pending and then new messages - only this library actually supports it)
- XACK
- XAUTOCLAIM
Many other queuing implementations with Redis Streams contain a big bug. They incorrectly support reconnecting a consumer to a topic if a message has been received but not acknowledged. They use ">" streaming strategy, which does not deliver unacknowledged messages more than once. And you miss messages when microservices are restarted. This library does not have this disadvantage.
The connection string must be defined in the REDIS_URL
environment value.
All consumers already have a group, even if there is only one consumer in the group.
Consumer groups receive the same messages from the topic, and consumers within the group receive these messages exclusively.
This driver supports new consumers joining with a new group name after the publisher has sent multiple messages to a topic before the group was created. These consumers will receive all previous non-ACK-ed messages from the beginning of the topic.
import (
_ "github.com/covrom/redispubsub"
"gocloud.dev/pubsub"
)
ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "redis://topics/1")
if err != nil {
return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)
m := &pubsub.Message{
Body: []byte("Hello, World!\n"),
// Metadata is optional and can be nil.
Metadata: map[string]string{
// These are examples of metadata.
// There is nothing special about the key names.
"language": "en",
"importance": "high",
},
}
err = topic.Send(ctx, m)
if err != nil {
return err
}
OpenTopic connection string host/path is required and must contain the topic name.
Connection string query parameters:
maxlen
is MAXLEN parameter for XADD (limit queue length), unlimited if not set.
import (
_ "github.com/covrom/redispubsub"
"gocloud.dev/pubsub"
)
subs, err := pubsub.OpenSubscription(ctx, "redis://group1?consumer=cons1&topic=topics/1")
if err != nil {
return err
}
defer subs.Shutdown(ctx)
msg, err := subs.Receive(ctx)
if err != nil {
// Errors from Receive indicate that Receive will no longer succeed.
return fmt.Errorf("Receiving message: %v", err)
}
// Do work based on the message, for example:
fmt.Printf("Got message: %q\n", msg.Body)
// Messages must always be acknowledged with Ack.
msg.Ack()
OpenSubscription connection string host(path) is required and must contain the consumer group name.
Connection string query parameters:
topic
is topic name, requiredconsumer
is unique consumer name, requiredfrom
is FROM option for creating a consumer group (if not exists) with XGROUP CREATE, default is '0'autoclaim
is min-idle-time option for XAUTOCLAIM, 30 min by defaultnoack
is NOACK option for XREADGROUP, not used by default
See basic_test.go for full usage example.
Use redis-exporter prometheus exporter with check-streams
option.
See streams.go for details.