Skip to content

Commit

Permalink
changefeed(ticdc): add metrics to track lag distribution (#7032)
Browse files Browse the repository at this point in the history
ref #4757
  • Loading branch information
3AceShowHand authored Sep 9, 2022
1 parent ab9e5ab commit 24f7f18
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 60 deletions.
40 changes: 30 additions & 10 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ type changefeed struct {
// `ddlWg` is used to manage this backend goroutine.
ddlWg sync.WaitGroup

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer
metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedCheckpointLagDuration prometheus.Observer

metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsLagDuration prometheus.Observer

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

newDDLPuller func(ctx context.Context,
replicaConfig *config.ReplicaConfig,
Expand Down Expand Up @@ -532,16 +536,22 @@ LOOP:
}

func (c *changefeed) initMetrics() {
c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointLagDuration = changefeedCheckpointLagDuration.
WithLabelValues(c.id.Namespace, c.id.ID)

c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagDuration = changefeedResolvedTsLagDuration.
WithLabelValues(c.id.Namespace, c.id.ID)

c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = changefeedTickDuration.
WithLabelValues(c.id.Namespace, c.id.ID)
}
Expand Down Expand Up @@ -597,13 +607,17 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
func (c *changefeed) cleanupMetrics() {
changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil
c.metricsChangefeedCheckpointLagDuration = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil
c.metricsChangefeedResolvedTsLagDuration = nil

changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = nil
Expand Down Expand Up @@ -844,11 +858,17 @@ func (c *changefeed) asyncExecDDLEvent(ctx cdcContext.Context,
func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) {
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

checkpointLag := float64(currentTs-phyCkpTs) / 1e3
c.metricsChangefeedCheckpointTsLagGauge.Set(checkpointLag)
c.metricsChangefeedCheckpointLagDuration.Observe(checkpointLag)

phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)

resolvedLag := float64(currentTs-phyRTs) / 1e3
c.metricsChangefeedResolvedTsLagGauge.Set(resolvedLag)
c.metricsChangefeedResolvedTsLagDuration.Observe(resolvedLag)
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) {
Expand Down
27 changes: 26 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ var (
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})

changefeedCheckpointLagDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_lag_histogram",
Help: "checkpoint lag histogram of changefeeds",
Buckets: prometheus.ExponentialBuckets(0.5, 2, 10),
}, []string{"namespace", "changefeed"})

changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -56,6 +66,16 @@ var (
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})

changefeedResolvedTsLagDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag_histogram",
Help: "resolved_ts lag histogram of changefeeds",
Buckets: prometheus.ExponentialBuckets(0.5, 2, 10),
}, []string{"namespace", "changefeed"})

ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -117,10 +137,15 @@ const (
// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedBarrierTsGauge)

registry.MustRegister(changefeedCheckpointTsGauge)
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(changefeedCheckpointLagDuration)

registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagDuration)

registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
}

o.captures = state.Captures
o.updateMetrics(state)
o.updateMetrics()

// handleJobs() should be called before clusterVersionConsistent(), because
// when there are different versions of cdc nodes in the cluster,
Expand Down Expand Up @@ -341,7 +341,7 @@ func (o *ownerImpl) cleanStaleMetrics() {
changefeedStatusGauge.Reset()
}

func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
func (o *ownerImpl) updateMetrics() {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
now := time.Now()
Expand Down
Loading

0 comments on commit 24f7f18

Please sign in to comment.