Skip to content

Commit

Permalink
[ADDED] FetchHeartbeat option for Fetch and FetchBytes (#1548)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Feb 9, 2024
1 parent 1c24aa7 commit 2ec933c
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 17 deletions.
16 changes: 13 additions & 3 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type (
// defaults to 30 seconds and can be configured using FetchMaxWait
// option.
//
// By default, Fetch uses a 5s idle heartbeat for requests longer than
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
// This can be configured using FetchHeartbeat option. If a client does
// not receive a heartbeat message from a stream for more than 2 times
// the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
//
// Fetch is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
Expand All @@ -65,6 +71,12 @@ type (
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
//
// By default, FetchBytes uses a 5s idle heartbeat for requests longer than
// 10 seconds. For shorter requests, the idle heartbeat is disabled.
// This can be configured using FetchHeartbeat option. If a client does
// not receive a heartbeat message from a stream for more than 2 times
// the idle heartbeat setting, Fetch will return ErrNoHeartbeat.
//
// FetchBytes is non-blocking and returns MessageBatch, exposing a channel
// for delivered messages.
//
Expand All @@ -75,9 +87,7 @@ type (
// FetchNoWait is used to retrieve up to a provided number of messages
// from a stream. Unlike Fetch, FetchNoWait will only deliver messages
// that are currently available in the stream and will not wait for new
// messages to arrive, even if batch size is not met. FetchNoWait
// timeout defaults to 30 seconds and can be configured using
// FetchMaxWait option.
// messages to arrive, even if batch size is not met.
//
// FetchNoWait is non-blocking and returns MessageBatch, exposing a
// channel for delivered messages.
Expand Down
20 changes: 20 additions & 0 deletions jetstream/jetstream_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt {
}

// FetchMaxWait sets custom timeout for fetching predefined batch of messages.
//
// If not provided, a default of 30 seconds will be used.
func FetchMaxWait(timeout time.Duration) FetchOpt {
return func(req *pullRequest) error {
if timeout <= 0 {
Expand All @@ -270,6 +272,24 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
}
}

// FetchHeartbeat sets custom heartbeat for individual fetch request. If a
// client does not receive a heartbeat message from a stream for more than 2
// times the idle heartbeat setting, Fetch will return [ErrNoHeartbeat].
//
// Heartbeat value has to be lower than FetchMaxWait / 2.
//
// If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 30s
// and disabled otherwise.
func FetchHeartbeat(hb time.Duration) FetchOpt {
return func(req *pullRequest) error {
if hb <= 0 {
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
}
req.Heartbeat = hb
return nil
}
}

// WithDeletedDetails can be used to display the information about messages
// deleted from a stream on a stream info request
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
Expand Down
48 changes: 35 additions & 13 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,17 +729,26 @@ func (s *pullSubscription) Drain() {
// It will wait up to provided expiry time if not all messages are available.
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
req := &pullRequest{
Batch: batch,
Expires: DefaultExpires,
Batch: batch,
Expires: DefaultExpires,
Heartbeat: unset,
}
for _, opt := range opts {
if err := opt(req); err != nil {
return nil, err
}
}
// for longer pulls, set heartbeat value
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
} else {
req.Heartbeat = 0
}
}
if req.Expires < 2*req.Heartbeat {
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
}

return p.fetch(req)
Expand All @@ -748,26 +757,35 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
// FetchBytes is used to retrieve up to a provided bytes from the stream.
func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) {
req := &pullRequest{
Batch: 1000000,
MaxBytes: maxBytes,
Expires: DefaultExpires,
Batch: 1000000,
MaxBytes: maxBytes,
Expires: DefaultExpires,
Heartbeat: unset,
}
for _, opt := range opts {
if err := opt(req); err != nil {
return nil, err
}
}
// for longer pulls, set heartbeat value
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
if req.Expires >= 10*time.Second {
req.Heartbeat = 5 * time.Second
} else {
req.Heartbeat = 0
}
}
if req.Expires < 2*req.Heartbeat {
return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption)
}

return p.fetch(req)
}

// FetchNoWait sends a single request to retrieve given number of messages.
// If there are any messages available at the time of sending request,
// FetchNoWait will return immediately.
// FetchNoWait will only return messages that are available at the time of the
// request. It will not wait for more messages to arrive.
func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) {
req := &pullRequest{
Batch: batch,
Expand Down Expand Up @@ -842,6 +860,10 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
return
}
p.Unlock()
case err := <-sub.errs:
res.err = err
res.done = true
return
case <-time.After(req.Expires + 1*time.Second):
res.done = true
return
Expand Down
Loading

0 comments on commit 2ec933c

Please sign in to comment.