Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/redisstream/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (c *PublisherConfig) Validate() error {

// Publish publishes message to redis stream
//
// Publish is blocking and wait for redis response
// When one of messages delivery fails - function is interrupted.
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
Expand Down
49 changes: 20 additions & 29 deletions pkg/redisstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,14 @@ const (

DefaultBlockTime = time.Millisecond * 100

// How often to claim pending messages
DefaultClaimInterval = time.Second * 5

DefaultClaimBatchSize = int64(100)

// Default max idle time for pending message
// After timeout, the message will be claimed
DefaultMaxIdleTime = time.Second * 60

// How often to check for dead consumers
DefaultCheckConsumersInterval = time.Second * 300

// Default consumer timeout
// After being idle longer than timeout and having no pending messages, it will be removed from the consumer group
DefaultConsumerTimeout = time.Second * 600
DefaultConsumerTimeout = time.Second * 600
)

type Subscriber struct {
Expand All @@ -51,7 +44,7 @@ type Subscriber struct {
closeMutex sync.Mutex
}

// NewSubscriber creates a new redis stream Subscriber
// NewSubscriber creates a new redis stream Subscriber.
func NewSubscriber(config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {
config.setDefaults()

Expand All @@ -76,46 +69,44 @@ type SubscriberConfig struct {

Unmarshaller Unmarshaller

// Redis stream consumer id, paired with ConsumerGroup
// Redis stream consumer id, paired with ConsumerGroup.
Consumer string
// When empty, fan-out mode will be used
// When empty, fan-out mode will be used.
ConsumerGroup string

// How long after Nack message should be redelivered
// How long after Nack message should be redelivered.
NackResendSleep time.Duration

// Block to wait next redis stream message
// Block to wait next redis stream message.
BlockTime time.Duration

// Claim idle pending message interval
// Claim idle pending message interval.
ClaimInterval time.Duration

// How many pending messages are claimed at most each claim interval
// How many pending messages are claimed at most each claim interval.
ClaimBatchSize int64

// How long should we treat a pending message as claimable
// How long should we treat a pending message as claimable.
MaxIdleTime time.Duration

// Check consumer status interval
// Check consumer status interval.
CheckConsumersInterval time.Duration

// After which time an idle consumer with no pending messages will be removed from the consumer group
// After this timeout an idle consumer with no pending messages will be removed from the consumer group.
ConsumerTimeout time.Duration

// Start consumption from the specified message ID
// When using "0", the consumer group will consume from the very first message
// When using "$", the consumer group will consume from the latest message
// Start consumption from the specified message ID.
// When using "0", the consumer group will consume from the very first message.
// When using "$", the consumer group will consume from the latest message.
OldestId string

// If this is set, it will be called to decide whether messages that
// have been idle for longer than MaxIdleTime should actually be re-claimed,
// and the consumer that had previously claimed it should be kicked out.
// If this is not set, then all messages that have been idle for longer
// than MaxIdleTime will be re-claimed.
// If this is set, it will be called to decide whether a pending message that
// has been idle for more than MaxIdleTime should actually be claimed.
// If this is not set, then all pending messages that have been idle for more than MaxIdleTime will be claimed.
// This can be useful e.g. for tasks where the processing time can be very variable -
// so we can't just use a short MaxIdleTime; but where at the same time dead
// workers should be spotted quickly - so we can't just use a long MaxIdleTime either.
// In such cases, if we have another way of checking for workers' health, then we can
// so we can't just use a short MaxIdleTime; but at the same time dead
// consumers should be spotted quickly - so we can't just use a long MaxIdleTime either.
// In such cases, if we have another way for checking consumers' health, then we can
// leverage that in this callback.
ShouldClaimPendingMessage func(redis.XPendingExt) bool
}
Expand Down