From 2ec933c8b1b0b18c4736cadf9b5002af63bc0c43 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 9 Feb 2024 10:36:12 +0100 Subject: [PATCH] [ADDED] FetchHeartbeat option for Fetch and FetchBytes (#1548) Signed-off-by: Piotr Piotrowski --- jetstream/consumer.go | 16 ++- jetstream/jetstream_options.go | 20 +++ jetstream/pull.go | 48 +++++-- jetstream/test/pull_test.go | 234 ++++++++++++++++++++++++++++++++- 4 files changed, 301 insertions(+), 17 deletions(-) diff --git a/jetstream/consumer.go b/jetstream/consumer.go index 3023413db..7792ced6d 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -52,6 +52,12 @@ type ( // defaults to 30 seconds and can be configured using FetchMaxWait // option. // + // By default, Fetch uses a 5s idle heartbeat for requests longer than + // 10 seconds. For shorter requests, the idle heartbeat is disabled. + // This can be configured using FetchHeartbeat option. If a client does + // not receive a heartbeat message from a stream for more than 2 times + // the idle heartbeat setting, Fetch will return [ErrNoHeartbeat]. + // // Fetch is non-blocking and returns MessageBatch, exposing a channel // for delivered messages. // @@ -65,6 +71,12 @@ type ( // timeout defaults to 30 seconds and can be configured using // FetchMaxWait option. // + // By default, FetchBytes uses a 5s idle heartbeat for requests longer than + // 10 seconds. For shorter requests, the idle heartbeat is disabled. + // This can be configured using FetchHeartbeat option. If a client does + // not receive a heartbeat message from a stream for more than 2 times + // the idle heartbeat setting, Fetch will return ErrNoHeartbeat. + // // FetchBytes is non-blocking and returns MessageBatch, exposing a channel // for delivered messages. // @@ -75,9 +87,7 @@ type ( // FetchNoWait is used to retrieve up to a provided number of messages // from a stream. Unlike Fetch, FetchNoWait will only deliver messages // that are currently available in the stream and will not wait for new - // messages to arrive, even if batch size is not met. FetchNoWait - // timeout defaults to 30 seconds and can be configured using - // FetchMaxWait option. + // messages to arrive, even if batch size is not met. // // FetchNoWait is non-blocking and returns MessageBatch, exposing a // channel for delivered messages. diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index 53fbd71ad..699a9013e 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -260,6 +260,8 @@ func WithMessagesErrOnMissingHeartbeat(hbErr bool) PullMessagesOpt { } // FetchMaxWait sets custom timeout for fetching predefined batch of messages. +// +// If not provided, a default of 30 seconds will be used. func FetchMaxWait(timeout time.Duration) FetchOpt { return func(req *pullRequest) error { if timeout <= 0 { @@ -270,6 +272,24 @@ func FetchMaxWait(timeout time.Duration) FetchOpt { } } +// FetchHeartbeat sets custom heartbeat for individual fetch request. If a +// client does not receive a heartbeat message from a stream for more than 2 +// times the idle heartbeat setting, Fetch will return [ErrNoHeartbeat]. +// +// Heartbeat value has to be lower than FetchMaxWait / 2. +// +// If not provided, heartbeat will is set to 5s for requests with FetchMaxWait > 30s +// and disabled otherwise. +func FetchHeartbeat(hb time.Duration) FetchOpt { + return func(req *pullRequest) error { + if hb <= 0 { + return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption) + } + req.Heartbeat = hb + return nil + } +} + // WithDeletedDetails can be used to display the information about messages // deleted from a stream on a stream info request func WithDeletedDetails(deletedDetails bool) StreamInfoOpt { diff --git a/jetstream/pull.go b/jetstream/pull.go index f7c1f05da..bb5479aa0 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -729,17 +729,26 @@ func (s *pullSubscription) Drain() { // It will wait up to provided expiry time if not all messages are available. func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) { req := &pullRequest{ - Batch: batch, - Expires: DefaultExpires, + Batch: batch, + Expires: DefaultExpires, + Heartbeat: unset, } for _, opt := range opts { if err := opt(req); err != nil { return nil, err } } - // for longer pulls, set heartbeat value - if req.Expires >= 10*time.Second { - req.Heartbeat = 5 * time.Second + // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls + // and disable it for shorter pulls + if req.Heartbeat == unset { + if req.Expires >= 10*time.Second { + req.Heartbeat = 5 * time.Second + } else { + req.Heartbeat = 0 + } + } + if req.Expires < 2*req.Heartbeat { + return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption) } return p.fetch(req) @@ -748,26 +757,35 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) // FetchBytes is used to retrieve up to a provided bytes from the stream. func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error) { req := &pullRequest{ - Batch: 1000000, - MaxBytes: maxBytes, - Expires: DefaultExpires, + Batch: 1000000, + MaxBytes: maxBytes, + Expires: DefaultExpires, + Heartbeat: unset, } for _, opt := range opts { if err := opt(req); err != nil { return nil, err } } - // for longer pulls, set heartbeat value - if req.Expires >= 10*time.Second { - req.Heartbeat = 5 * time.Second + // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls + // and disable it for shorter pulls + if req.Heartbeat == unset { + if req.Expires >= 10*time.Second { + req.Heartbeat = 5 * time.Second + } else { + req.Heartbeat = 0 + } + } + if req.Expires < 2*req.Heartbeat { + return nil, fmt.Errorf("%w: expiry time should be at least 2 times the heartbeat", ErrInvalidOption) } return p.fetch(req) } // FetchNoWait sends a single request to retrieve given number of messages. -// If there are any messages available at the time of sending request, -// FetchNoWait will return immediately. +// FetchNoWait will only return messages that are available at the time of the +// request. It will not wait for more messages to arrive. func (p *pullConsumer) FetchNoWait(batch int) (MessageBatch, error) { req := &pullRequest{ Batch: batch, @@ -842,6 +860,10 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { return } p.Unlock() + case err := <-sub.errs: + res.err = err + res.done = true + return case <-time.After(req.Expires + 1*time.Second): res.done = true return diff --git a/jetstream/test/pull_test.go b/jetstream/test/pull_test.go index 300d544de..b46697e7a 100644 --- a/jetstream/test/pull_test.go +++ b/jetstream/test/pull_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 The NATS Authors +// Copyright 2022-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -351,6 +351,130 @@ func TestPullConsumerFetch(t *testing.T) { t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) } }) + + t.Run("with missing heartbeat", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + publishTestMsgs(t, nc) + // fetch 5 messages, should return normally + msgs, err := c.Fetch(5, jetstream.FetchHeartbeat(50*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var i int + for range msgs.Messages() { + i++ + } + if i != len(testMsgs) { + t.Fatalf("Expected 5 messages; got: %d", i) + } + if msgs.Error() != nil { + t.Fatalf("Unexpected error during fetch: %v", msgs.Error()) + } + + // fetch again, should timeout without any error + msgs, err = c.Fetch(5, jetstream.FetchHeartbeat(50*time.Millisecond), jetstream.FetchMaxWait(200*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case _, ok := <-msgs.Messages(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(1 * time.Second): + t.Fatalf("Expected channel to be closed") + } + if msgs.Error() != nil { + t.Fatalf("Unexpected error during fetch: %v", msgs.Error()) + } + + // delete the consumer, at this point server should stop sending heartbeats for pull requests + if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil { + t.Fatalf("Error deleting consumer: %s", err) + } + msgs, err = c.Fetch(5, jetstream.FetchHeartbeat(50*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + select { + case _, ok := <-msgs.Messages(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(1 * time.Second): + t.Fatalf("Expected channel to be closed") + } + if !errors.Is(msgs.Error(), jetstream.ErrNoHeartbeat) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err) + } + }) + + t.Run("with invalid heartbeat value", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // default expiry (30s), hb too large + _, err = c.Fetch(5, jetstream.FetchHeartbeat(20*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + + // custom expiry, hb too large + _, err = c.Fetch(5, jetstream.FetchHeartbeat(2*time.Second), jetstream.FetchMaxWait(3*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + + // negative heartbeat + _, err = c.Fetch(5, jetstream.FetchHeartbeat(-2*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + }) } func TestPullConsumerFetchBytes(t *testing.T) { @@ -542,6 +666,114 @@ func TestPullConsumerFetchBytes(t *testing.T) { t.Fatalf("Unexpected error during fetch: %v", msgs.Error()) } }) + + t.Run("with missing heartbeat", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // fetch again, should timeout without any error + msgs, err := c.FetchBytes(5, jetstream.FetchHeartbeat(50*time.Millisecond), jetstream.FetchMaxWait(200*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case _, ok := <-msgs.Messages(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(1 * time.Second): + t.Fatalf("Expected channel to be closed") + } + if msgs.Error() != nil { + t.Fatalf("Unexpected error during fetch: %v", msgs.Error()) + } + + // delete the consumer, at this point server should stop sending heartbeats for pull requests + if err := s.DeleteConsumer(ctx, c.CachedInfo().Name); err != nil { + t.Fatalf("Error deleting consumer: %s", err) + } + msgs, err = c.FetchBytes(5, jetstream.FetchHeartbeat(50*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + select { + case _, ok := <-msgs.Messages(): + if ok { + t.Fatalf("Expected channel to be closed") + } + case <-time.After(1 * time.Second): + t.Fatalf("Expected channel to be closed") + } + if !errors.Is(msgs.Error(), jetstream.ErrNoHeartbeat) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrNoHeartbeat, err) + } + }) + + t.Run("with invalid heartbeat value", func(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // default expiry (30s), hb too large + _, err = c.FetchBytes(5, jetstream.FetchHeartbeat(20*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + + // custom expiry, hb too large + _, err = c.FetchBytes(5, jetstream.FetchHeartbeat(2*time.Second), jetstream.FetchMaxWait(3*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + + // negative heartbeat + _, err = c.FetchBytes(5, jetstream.FetchHeartbeat(-2*time.Second)) + if !errors.Is(err, jetstream.ErrInvalidOption) { + t.Fatalf("Expected error: %v; got: %v", jetstream.ErrInvalidOption, err) + } + }) } func TestPullConsumerFetch_WithCluster(t *testing.T) {