Skip to content

Commit 7cd5ea6

Browse files
authored
kgo: fix mark <=> epoch interaction, make epoch handling more resilient (#1203)
MarkCommit{Records,Offsets} compared the epoch/offset you were marking against the existing head epoch/offset. If the there was no internally existing epoch/offset, or only `dirty` was set (via being consumed) while `head` / `committed` were both the default struct value {0,0}, then trying to mark a record with a negative epoch would be ignored. Note that returning -1 via the broker epoch requires a broker to both (a) SUPPORT epochs, i.e. implement all the Kafka APIs with leader epoch support, and then (b) NOT SUPPORT epochs, i.e. return / use -1 everywhere. This has only been seen against Azure Event hubs. Anyway, now, when initializing an EpochOffset internally (for an uncommit {dirty,head,committed}, the epoch is explicitly initialized with -1. Further, for added robustness, MarkCommit{Records,Offsets} only compares against existing values -- if a value does not exist, we auto-accept the mark. This commit also: * Improves EpochOffset.Less to assume all negative epochs are -1 * Simplifies the logic in CommitRecords to use EpochOffset.Less * Adds debug lines in metadata when new partitions are added * Adds the epoch to an existing debug log line when updating uncommitted * Sets the LeaderEpoch to -1 for records when producing, and clarifies in docs that -1 should have always been the case.
1 parent 8906743 commit 7cd5ea6

File tree

4 files changed

+38
-15
lines changed

4 files changed

+38
-15
lines changed

pkg/kgo/consumer_group.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1878,7 +1878,8 @@ type EpochOffset struct {
18781878
// than the other if this one's epoch is less, or the epoch's are equal and
18791879
// this one's offset is less.
18801880
func (e EpochOffset) Less(o EpochOffset) bool {
1881-
return e.Epoch < o.Epoch || e.Epoch == o.Epoch && e.Offset < o.Offset
1881+
ee, oe := max(e.Epoch, -1), max(o.Epoch, -1)
1882+
return ee < oe || ee == oe && e.Offset < o.Offset
18821883
}
18831884

18841885
type uncommitted map[string]map[int32]uncommit
@@ -1925,13 +1926,18 @@ func (g *groupConsumer) updateUncommitted(fetches Fetches) {
19251926
final.LeaderEpoch, // -1 if old message / unknown
19261927
final.Offset + 1,
19271928
}
1928-
prior := topicOffsets[partition.Partition]
1929+
prior, ok := topicOffsets[partition.Partition]
1930+
if !ok {
1931+
uninit := EpochOffset{-1, 0}
1932+
uncommit := uncommit{uninit, uninit, uninit}
1933+
prior, topicOffsets[partition.Partition] = uncommit, uncommit
1934+
}
19291935

19301936
if debug {
19311937
if setHead {
1932-
fmt.Fprintf(&b, "%d{%d=>%d r%d}, ", partition.Partition, prior.head.Offset, set.Offset, len(partition.Records))
1938+
fmt.Fprintf(&b, "%d{%d=>%d r%d e%d}, ", partition.Partition, prior.head.Offset, set.Offset, len(partition.Records), set.Epoch)
19331939
} else {
1934-
fmt.Fprintf(&b, "%d{%d=>%d=>%d r%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset, len(partition.Records))
1940+
fmt.Fprintf(&b, "%d{%d=>%d=>%d r%d e%d}, ", partition.Partition, prior.head.Offset, prior.dirty.Offset, set.Offset, len(partition.Records), set.Epoch)
19351941
}
19361942
}
19371943

@@ -2400,15 +2406,18 @@ func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error {
24002406
offsets[r.Topic] = toffsets
24012407
}
24022408

2409+
set := EpochOffset{
2410+
r.LeaderEpoch,
2411+
r.Offset + 1, // need to advice to next offset to move forward
2412+
}
2413+
24032414
if at, exists := toffsets[r.Partition]; exists {
2404-
if at.Epoch > r.LeaderEpoch || at.Epoch == r.LeaderEpoch && at.Offset > r.Offset {
2415+
if set.Less(at) {
24052416
continue
24062417
}
24072418
}
2408-
toffsets[r.Partition] = EpochOffset{
2409-
r.LeaderEpoch,
2410-
r.Offset + 1, // need to advice to next offset to move forward
2411-
}
2419+
2420+
toffsets[r.Partition] = set
24122421
}
24132422

24142423
var rerr error // return error
@@ -2469,11 +2478,11 @@ func (cl *Client) MarkCommitRecords(rs ...*Record) {
24692478
curTopic = r.Topic
24702479
}
24712480

2472-
current := curPartitions[r.Partition]
2481+
current, ok := curPartitions[r.Partition]
24732482
if newHead := (EpochOffset{
24742483
r.LeaderEpoch,
24752484
r.Offset + 1,
2476-
}); current.head.Less(newHead) {
2485+
}); !ok || current.head.Less(newHead) {
24772486
curPartitions[r.Partition] = uncommit{
24782487
dirty: current.dirty,
24792488
committed: current.committed,
@@ -2509,8 +2518,8 @@ func (cl *Client) MarkCommitOffsets(unmarked map[string]map[int32]EpochOffset) {
25092518
}
25102519

25112520
for partition, newHead := range partitions {
2512-
current := curPartitions[partition]
2513-
if current.head.Less(newHead) {
2521+
current, ok := curPartitions[partition]
2522+
if !ok || current.head.Less(newHead) {
25142523
curPartitions[partition] = uncommit{
25152524
dirty: current.dirty,
25162525
committed: current.committed,

pkg/kgo/metadata.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,8 +926,20 @@ func (cl *Client) mergeTopicPartitions(
926926
for _, newTP := range newPartitions {
927927
if isProduce && newTP.records.recBufsIdx == -1 {
928928
newTP.records.sink.addRecBuf(newTP.records)
929+
cl.cfg.logger.Log(LogLevelDebug, "metadata refresh new produce partition",
930+
"topic", topic,
931+
"partition", newTP.partition(),
932+
"leader", newTP.leader,
933+
"leader_epoch", newTP.leaderEpoch,
934+
)
929935
} else if !isProduce && newTP.cursor.cursorsIdx == -1 {
930936
newTP.cursor.source.addCursor(newTP.cursor)
937+
cl.cfg.logger.Log(LogLevelDebug, "metadata refresh new consume partition",
938+
"topic", topic,
939+
"partition", newTP.partition(),
940+
"leader", newTP.leader,
941+
"leader_epoch", newTP.leaderEpoch,
942+
)
931943
}
932944
}
933945
}

pkg/kgo/producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ func (p *producer) finishPromises(b batchPromise) {
624624
}()
625625
start:
626626
for i, pr := range b.recs {
627-
pr.LeaderEpoch = 0
627+
pr.LeaderEpoch = -1
628628
if b.baseOffset == -1 {
629629
// if the base offset is invalid/unknown (-1), all record offsets should
630630
// be treated as unknown

pkg/kgo/record_and_fetch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ type Record struct {
130130
ProducerID int64
131131

132132
// LeaderEpoch is the leader epoch of the broker at the time this
133-
// record was written, or -1 if on message sets.
133+
// record was written, or -1 if on message sets. When producing,
134+
// this is always set to -1 (producers do not use this field and the
135+
// broker does not reply with the epoch).
134136
//
135137
// For committing records, it is not recommended to modify the
136138
// LeaderEpoch. Clients use the LeaderEpoch for data loss detection.

0 commit comments

Comments
 (0)