Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ go_library(
"//pkg/util/httputil",
"//pkg/util/humanizeutil",
"//pkg/util/intsets",
"//pkg/util/iterutil",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/log/channel",
Expand Down
29 changes: 21 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// The feed's checkpoint is tracked in a map which is used to inform the
// checkpoint_progress metric which will return the lowest timestamp across
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, cf.frontier.Frontier())
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

return cf.maybeEmitResolved(cf.Ctx(), newResolved)
}
Expand Down Expand Up @@ -2226,16 +2226,29 @@ func (cf *changeFrontier) maybeEmitResolved(ctx context.Context, newResolved hlc

// updateProgressSkewMetrics updates the progress skew metrics.
func (cf *changeFrontier) updateProgressSkewMetrics() {
maxSpanTS := cf.frontier.LatestTS()
maxTableTS := cf.frontier.Frontier()
for _, f := range cf.frontier.Frontiers() {
tableTS := f.Frontier()
if tableTS.After(maxTableTS) {
maxTableTS = tableTS
fastestSpanTS := cf.frontier.LatestTS()
fastestTableTS := func() hlc.Timestamp {
var maxTS hlc.Timestamp
for _, f := range cf.frontier.Frontiers() {
if f.Frontier().After(maxTS) {
maxTS = f.Frontier()
}
}
return maxTS
}()

slowestTS := cf.frontier.Frontier()
var spanSkew, tableSkew int64
if slowestTS.IsSet() {
if fastestSpanTS.IsSet() {
spanSkew = fastestSpanTS.WallTime - slowestTS.WallTime
}
if fastestTableTS.IsSet() {
tableSkew = fastestTableTS.WallTime - slowestTS.WallTime
}
}

cf.sliMetrics.setFastestTS(cf.sliMetricsID, maxSpanTS, maxTableTS)
cf.sliMetrics.setProgressSkew(cf.sliMetricsID, spanSkew, tableSkew)
}

func frontierIsBehind(frontier hlc.Timestamp, sv *settings.Values) bool {
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeed_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ WITH no_initial_scan, min_checkpoint_frequency='1s', resolved, metrics_label='%s
}
if !perTableTracking {
if tableSkew != 0 {
// TODO(#155083): Return an error here.
return nil
return errors.Newf("expected table skew to be 0, got %d", tableSkew)
}
return nil
}
Expand Down
64 changes: 27 additions & 37 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package changefeedccl

import (
"cmp"
"context"
"maps"
"slices"
"strings"
"sync/atomic"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/cidr"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
Expand Down Expand Up @@ -188,11 +191,11 @@ type sliMetrics struct {

mu struct {
syncutil.Mutex
id int64
resolved map[int64]hlc.Timestamp
checkpoint map[int64]hlc.Timestamp
fastestSpan map[int64]hlc.Timestamp
fastestTable map[int64]hlc.Timestamp
id int64
resolved map[int64]hlc.Timestamp
checkpoint map[int64]hlc.Timestamp
spanSkew map[int64]int64
tableSkew map[int64]int64
}
NetMetrics *cidr.NetMetrics

Expand All @@ -206,8 +209,8 @@ func (m *sliMetrics) closeId(id int64) {
defer m.mu.Unlock()
delete(m.mu.checkpoint, id)
delete(m.mu.resolved, id)
delete(m.mu.fastestSpan, id)
delete(m.mu.fastestTable, id)
delete(m.mu.spanSkew, id)
delete(m.mu.tableSkew, id)
}

// setResolved writes a resolved timestamp entry for the given id.
Expand All @@ -228,15 +231,15 @@ func (m *sliMetrics) setCheckpoint(id int64, ts hlc.Timestamp) {
}
}

// setFastestTS saves the fastest span/table timestamps for a given id.
func (m *sliMetrics) setFastestTS(id int64, spanTS hlc.Timestamp, tableTS hlc.Timestamp) {
// setProgressSkew saves the span skew/table skew for a given ID.
func (m *sliMetrics) setProgressSkew(id int64, spanSkew int64, tableSkew int64) {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.mu.fastestSpan[id]; ok {
m.mu.fastestSpan[id] = spanTS
if _, ok := m.mu.spanSkew[id]; ok {
m.mu.spanSkew[id] = spanSkew
}
if _, ok := m.mu.fastestTable[id]; ok {
m.mu.fastestTable[id] = tableTS
if _, ok := m.mu.tableSkew[id]; ok {
m.mu.tableSkew[id] = tableSkew
}
}

Expand All @@ -249,8 +252,8 @@ func (m *sliMetrics) claimId() int64 {
// ignored until a nonzero timestamp is written.
m.mu.checkpoint[id] = hlc.Timestamp{}
m.mu.resolved[id] = hlc.Timestamp{}
m.mu.fastestSpan[id] = hlc.Timestamp{}
m.mu.fastestTable[id] = hlc.Timestamp{}
m.mu.spanSkew[id] = 0
m.mu.tableSkew[id] = 0
m.mu.id++
return id
}
Expand Down Expand Up @@ -1296,8 +1299,8 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
sm.mu.fastestSpan = make(map[int64]hlc.Timestamp)
sm.mu.fastestTable = make(map[int64]hlc.Timestamp)
sm.mu.spanSkew = make(map[int64]int64)
sm.mu.tableSkew = make(map[int64]int64)
sm.mu.id = 1 // start the first id at 1 so we can detect intiialization

minTimestampGetter := func(m map[int64]hlc.Timestamp) func() int64 {
Expand Down Expand Up @@ -1328,34 +1331,21 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}
}

maxTimestampSkewGetter := func(
base map[int64]hlc.Timestamp, ahead map[int64]hlc.Timestamp,
) func() int64 {
maxTimestampSkewGetter := func(m map[int64]int64) func() int64 {
return func() int64 {
sm.mu.Lock()
defer sm.mu.Unlock()
var maxSkew int64
for id, b := range base {
a := ahead[id]
if a.IsEmpty() || b.IsEmpty() {
continue
}
skew := a.WallTime - b.WallTime
if skew > maxSkew {
maxSkew = skew
}
}
return maxSkew
return iterutil.MaxFunc(maps.Values(m), cmp.Compare)
}
}

sm.AggregatorProgress = a.AggregatorProgress.AddFunctionalChild(minTimestampGetter(sm.mu.resolved), scope)
sm.CheckpointProgress = a.CheckpointProgress.AddFunctionalChild(minTimestampGetter(sm.mu.checkpoint), scope)
sm.MaxBehindNanos = a.MaxBehindNanos.AddFunctionalChild(maxBehindNanosGetter(sm.mu.resolved), scope)
sm.SpanProgressSkew = a.SpanProgressSkew.AddFunctionalChild(
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestSpan), scope)
maxTimestampSkewGetter(sm.mu.spanSkew), scope)
sm.TableProgressSkew = a.TableProgressSkew.AddFunctionalChild(
maxTimestampSkewGetter(sm.mu.checkpoint, sm.mu.fastestTable), scope)
maxTimestampSkewGetter(sm.mu.tableSkew), scope)

a.mu.sliMetrics[scope] = sm
return sm, nil
Expand All @@ -1364,7 +1354,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
// getLaggingRangesCallback returns a function which can be called to update the
// lagging ranges metric. It should be called with the current number of lagging
// ranges.
func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
func (m *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64) {
// Because this gauge is shared between changefeeds in the same metrics scope,
// we must instead modify it using `Inc` and `Dec` (as opposed to `Update`) to
// ensure values written by others are not overwritten. The code below is used
Expand All @@ -1388,10 +1378,10 @@ func (s *sliMetrics) getLaggingRangesCallback() func(lagging int64, total int64)
last.Lock()
defer last.Unlock()

s.LaggingRanges.Dec(last.lagging - lagging)
m.LaggingRanges.Dec(last.lagging - lagging)
last.lagging = lagging

s.TotalRanges.Dec(last.total - total)
m.TotalRanges.Dec(last.total - total)
last.total = total
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/iterutil/iterutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,15 @@ func MinFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
}
return m
}

// MaxFunc returns the maximum element in seq, using cmp to compare elements.
// If seq has no values, the zero value is returned.
func MaxFunc[E any](seq iter.Seq[E], cmp func(E, E) int) E {
var m E
for i, v := range Enumerate(seq) {
if i == 0 || cmp(v, m) > 0 {
m = v
}
}
return m
}
32 changes: 32 additions & 0 deletions pkg/util/iterutil/iterutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,35 @@ func TestMinFunc(t *testing.T) {
})
}
}

func TestMaxFunc(t *testing.T) {
intCmp := func(a, b int) int {
return a - b
}

for name, tc := range map[string]struct {
input []int
}{
"empty": {
input: nil,
},
"one element": {
input: []int{1},
},
"multiple elements": {
input: []int{1, 3, 2},
},
"multiple elements with zero value": {
input: []int{1, 0, 3, 2},
},
} {
t.Run(name, func(t *testing.T) {
m := iterutil.MaxFunc(slices.Values(tc.input), intCmp)
if len(tc.input) == 0 {
require.Equal(t, 0, m)
} else {
require.Equal(t, slices.MaxFunc(tc.input, intCmp), m)
}
})
}
}