Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

record all records and validate leader epoch monotonicity #58

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/google/uuid v1.1.1
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go/pkg/kadm v0.0.0-20211116225244-e97ad6b8ef3e
github.com/twmb/franz-go/pkg/kmsg v1.7.0
Expand All @@ -15,6 +16,7 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/icza/dyno v0.0.0-20200205103839-49cb13720835 // indirect
Expand All @@ -24,6 +26,7 @@ require (
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/afero v1.6.0 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/cobra v1.1.3 // indirect
Expand All @@ -37,4 +40,5 @@ require (
golang.org/x/text v0.14.0 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
52 changes: 34 additions & 18 deletions pkg/worker/verifier/validator_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,66 +48,81 @@ type ValidatorStatus struct {

// Last consumed offset per partition. Used to assert monotonicity and check for gaps.
lastOffsetConsumed map[int32]int64

// Last leader epoch per partition. Used to assert monotonicity.
lastLeaderEpoch map[int32]int32
}

func (cs *ValidatorStatus) ValidateRecord(r *kgo.Record, validRanges *TopicOffsetRanges) {
expect_header_value := fmt.Sprintf("%06d.%018d", 0, r.Offset)
log.Debugf("Consumed %s on p=%d at o=%d", r.Key, r.Partition, r.Offset)
log.Debugf("Consumed %s on p=%d at o=%d leaderEpoch=%d", r.Key, r.Partition, r.Offset, r.LeaderEpoch)
cs.lock.Lock()
defer cs.lock.Unlock()

if r.LeaderEpoch < cs.lastLeaderEpoch[r.Partition] {
log.Panicf("Out of order leader epoch on p=%d at o=%d leaderEpoch=%d. Previous leaderEpoch=%d",
r.Partition, r.Offset, r.LeaderEpoch, cs.lastLeaderEpoch[r.Partition])
}

currentMax, present := cs.lastOffsetConsumed[r.Partition]
if present {
if currentMax < r.Offset {
expected := currentMax + 1
if r.Offset != expected {
log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset)
}
} else {
log.Panicf("Out of order read. Max consumed offset(partition=%d)=%d; Current record offset=%d", r.Partition, currentMax, r.Offset)
}
}

var got_header_value string
if len(r.Headers) > 0 {
got_header_value = string(r.Headers[0].Value)
}

if expect_header_value != got_header_value {
recordExpected := expect_header_value == got_header_value
if !recordExpected {
shouldBeValid := validRanges.Contains(r.Partition, r.Offset)

if shouldBeValid {
cs.InvalidReads += 1
util.Die("Bad read at offset %d on partition %s/%d. Expect '%s', found '%s'", r.Offset, r.Topic, r.Partition, expect_header_value, got_header_value)
log.Panicf("Bad read at offset %d on partition %s/%d. Expect '%s', found '%s'", r.Offset, r.Topic, r.Partition, expect_header_value, got_header_value)
} else {
cs.OutOfScopeInvalidReads += 1
log.Infof("Ignoring read validation at offset outside valid range %s/%d %d", r.Topic, r.Partition, r.Offset)
}
} else {
currentMax, present := cs.lastOffsetConsumed[r.Partition]
if present {
if currentMax < r.Offset {
expected := currentMax + 1
if r.Offset != expected {
log.Warnf("Gap detected in consumed offsets. Expected %d, but got %d", expected, r.Offset)
}
} else {
log.Fatalf("Out of order read. Max consumed offset(partition=%d)=%d; Current record offset=%d", r.Partition, currentMax, r.Offset)
}
}
cs.recordOffset(r)

cs.ValidReads += 1
log.Debugf("Read OK (%s) on p=%d at o=%d", r.Headers[0].Value, r.Partition, r.Offset)
}

cs.recordOffset(r, recordExpected)

if time.Since(cs.lastCheckpoint) > time.Second*5 {
cs.Checkpoint()
cs.lastCheckpoint = time.Now()
}
}

func (cs *ValidatorStatus) recordOffset(r *kgo.Record) {
func (cs *ValidatorStatus) recordOffset(r *kgo.Record, recordExpected bool) {
if cs.MaxOffsetsConsumed == nil {
cs.MaxOffsetsConsumed = make(map[int32]int64)
}
if cs.lastOffsetConsumed == nil {
cs.lastOffsetConsumed = make(map[int32]int64)
}
if cs.lastLeaderEpoch == nil {
cs.lastLeaderEpoch = make(map[int32]int32)
}

if r.Offset > cs.MaxOffsetsConsumed[r.Partition] {
// We bump highest offset only for valid records.
if r.Offset > cs.MaxOffsetsConsumed[r.Partition] && recordExpected {
cs.MaxOffsetsConsumed[r.Partition] = r.Offset
}

cs.lastOffsetConsumed[r.Partition] = r.Offset
cs.lastLeaderEpoch[r.Partition] = r.LeaderEpoch
}

func (cs *ValidatorStatus) RecordLostOffsets(p int32, count int64) {
Expand All @@ -126,6 +141,7 @@ func (cs *ValidatorStatus) ResetMonotonicityTestState() {
defer cs.lock.Unlock()

cs.lastOffsetConsumed = make(map[int32]int64)
cs.lastLeaderEpoch = make(map[int32]int32)
}

func (cs *ValidatorStatus) SetMonotonicityTestStateForPartition(partition int32, offset int64) {
Expand Down
126 changes: 126 additions & 0 deletions pkg/worker/verifier/validator_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package verifier_test

import (
"testing"

"github.com/redpanda-data/kgo-verifier/pkg/worker/verifier"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/twmb/franz-go/pkg/kgo"
)

func TestValidatorStatus_ValidateRecordHappyPath(t *testing.T) {
validator := verifier.NewValidatorStatus()
validRanges := verifier.NewTopicOffsetRanges("topic", 1)
validRanges.Insert(0, 41)
validRanges.Insert(0, 42)

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000041")}},
}, &validRanges)

validator.ValidateRecord(&kgo.Record{
Offset: 42,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000042")}},
}, &validRanges)

validator.ValidateRecord(&kgo.Record{
Offset: 43,
LeaderEpoch: 1,
}, &validRanges)

assert.Equal(t, int64(2), validator.ValidReads)
assert.Equal(t, int64(0), validator.InvalidReads)
assert.Equal(t, int64(1), validator.OutOfScopeInvalidReads)

// Only valid reads increment the max offset consumed.
assert.Equal(t, int64(42), validator.MaxOffsetsConsumed[0])

}

func TestValidatorStatus_ValidateRecordInvalidRead(t *testing.T) {
validator := verifier.NewValidatorStatus()
validRanges := verifier.NewTopicOffsetRanges("topic", 1)
validRanges.Insert(0, 41)

// Miss-match between expected offset as recorded in the header and the actual offset.
func() {
defer func() {
if r := recover(); r != nil {
assert.Equal(t, "Bad read at offset 41 on partition /0. Expect '000000.000000000000000041', found '000000.000000000000000040'", r.(*logrus.Entry).Message)
}
}()

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
Headers: []kgo.RecordHeader{{Key: "key", Value: []byte("000000.000000000000000040")}},
}, &validRanges)
}()
}

func TestValidatorStatus_ValidateRecordNonMonotonicOffset(t *testing.T) {
validator := verifier.NewValidatorStatus()
validRanges := verifier.NewTopicOffsetRanges("topic", 1)

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
}, &validRanges)

// Same offset read again.
func() {
defer func() {
if r := recover(); r != nil {
assert.Equal(t, "Out of order read. Max consumed offset(partition=0)=41; Current record offset=41", r.(*logrus.Entry).Message)
}
}()

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 0,
}, &validRanges)
}()

// Lower offset read after a higher offset.
func() {
defer func() {
if r := recover(); r != nil {
assert.Equal(t, "Out of order read. Max consumed offset(partition=0)=41; Current record offset=40", r.(*logrus.Entry).Message)
}
}()

validator.ValidateRecord(&kgo.Record{
Offset: 40,
LeaderEpoch: 0,
}, &validRanges)
}()
}


func TestValidatorStatus_ValidateRecordNonMonotonicLeaderEpoch(t *testing.T) {
validator := verifier.NewValidatorStatus()
validRanges := verifier.NewTopicOffsetRanges("topic", 1)

validator.ValidateRecord(&kgo.Record{
Offset: 41,
LeaderEpoch: 1,
}, &validRanges)

func() {
defer func() {
if r := recover(); r != nil {
assert.Equal(t, "Out of order leader epoch on p=0 at o=42 leaderEpoch=0. Previous leaderEpoch=1", r.(*logrus.Entry).Message)
}
}()

validator.ValidateRecord(&kgo.Record{
Offset: 42,
LeaderEpoch: 0,
}, &validRanges)
}()

}
Loading