Skip to content

Commit

Permalink
[FIXED] Stuck consumer after leader change
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Feb 10, 2025
1 parent 19389e3 commit 66cfd50
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 4 deletions.
11 changes: 7 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2812,17 +2812,20 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if sseq >= o.sseq {
// Let's make sure this is valid.
// This is only received on the consumer leader, so should never be higher
// than the last stream sequence.
// than the last stream sequence. But could happen if we've just become
// consumer leader, and we are not up-to-date on the stream yet.
var ss StreamState
mset.store.FastState(&ss)
if sseq > ss.LastSeq {
o.srv.Warnf("JetStream consumer '%s > %s > %s' ACK sequence %d past last stream sequence of %d",
o.acc.Name, o.stream, o.name, sseq, ss.LastSeq)
// FIXME(dlc) - For 2.11 onwards should we return an error here to the caller?
o.mu.Unlock()
return false
}
o.sseq = sseq + 1
// Even though another leader must have delivered a message with this sequence, we must not adjust
// the current pointer. This could otherwise result in a stuck consumer, where messages below this
// sequence can't be redelivered, and we'll have incorrect pending state and ack floors.
o.mu.Unlock()
return false
}

// Let the owning stream know if we are interest or workqueue retention based.
Expand Down
72 changes: 72 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7327,6 +7327,78 @@ func TestJetStreamClusterPeerRemoveStreamConsumerDesync(t *testing.T) {
})
}

func TestJetStreamClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

// Publish some messages into the stream.
for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

// Ensure all servers are up-to-date.
checkFor(t, 2*time.Second, 500*time.Millisecond, func() error {
return checkState(t, c, globalAccountName, "TEST")
})

sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
defer sub.Unsubscribe()

// We only fetch 1 message here, since the condition is hard to trigger otherwise.
// But, we're simulating fetching 3 messages and the consumer leader changing while
// deliveries are happening. This will result in the new consumer leader not knowing
// that the last two messages were also delivered (since we don't wait for quorum before delivering).
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)

// The client could send an acknowledgement, while the new consumer leader doesn't know about it
// ever being delivered. It must NOT adjust any state and ignore the request to remain consistent.
_, err = nc.Request("$JS.ACK.TEST.CONSUMER.1.3.3.0.0", nil, time.Second)
require_Error(t, err, nats.ErrTimeout)

// Acknowledging a message that is known to be delivered is accepted still.
_, err = nc.Request("$JS.ACK.TEST.CONSUMER.1.1.1.0.0", nil, time.Second)
require_NoError(t, err)

// Check for consistent consumer info.
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)

// Fetching for new messages MUST return the two messages the new consumer leader didn't
// know were delivered before. If we wouldn't deliver these we'd have a stuck consumer.
msgs, err = sub.Fetch(2)
require_NoError(t, err)
require_Len(t, len(msgs), 2)
for _, msg := range msgs {
require_NoError(t, msg.AckSync())
}

// Check for consistent consumer info.
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 3)
require_Equal(t, ci.Delivered.Stream, 3)
require_Equal(t, ci.AckFloor.Consumer, 3)
require_Equal(t, ci.AckFloor.Stream, 3)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 66cfd50

Please sign in to comment.