Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics(ticdc): changefeed metrics add namespace label #5311

Merged
merged 24 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3ee5318
add namespace info into ChangeFeedID
sdojjy Apr 27, 2022
065eeca
Merge remote-tracking branch 'upstream/master' into add-namespace-inf…
sdojjy Apr 27, 2022
4c6ef95
fix lint
sdojjy Apr 28, 2022
cfda39b
fix lint
sdojjy Apr 28, 2022
c1e26a0
Merge branch 'master' into add-namespace-info-into-changefeed
sdojjy Apr 28, 2022
c5c8668
default redo log do not add namespace
sdojjy Apr 28, 2022
e4be6cd
default redo log do not add namespace
sdojjy Apr 28, 2022
05644ee
Merge branch 'master' into add-namespace-info-into-changefeed
sdojjy Apr 29, 2022
2820daa
changefeed related metrics add namespace label
sdojjy Apr 29, 2022
0e2599b
fix test utils compile error
sdojjy Apr 29, 2022
e81fea4
Merge branch 'master' into add-namespace-info-into-changefeed
sdojjy May 5, 2022
eae9ced
Merge branch 'add-namespace-info-into-changefeed' into metrics-add-na…
sdojjy May 5, 2022
6a4149e
fix metrics error
sdojjy May 5, 2022
c74d78a
Merge branch 'master' into metrics-add-namespace-label
sdojjy May 5, 2022
9f21ea0
fix metrics error
sdojjy May 5, 2022
b36eb23
Update cdc/contextutil/ctx.go
sdojjy May 5, 2022
f3172ff
Update cdc/contextutil/ctx.go
sdojjy May 5, 2022
387a8a4
Merge remote-tracking branch 'origin/metrics-add-namespace-label' int…
sdojjy May 5, 2022
afe4db6
fix ut
sdojjy May 5, 2022
e7f1358
fix ut
sdojjy May 5, 2022
e6921da
Merge branch 'master' into metrics-add-namespace-label
sdojjy May 5, 2022
fb27ee1
fix lint
sdojjy May 5, 2022
955f038
Merge remote-tracking branch 'upstream/master' into metrics-add-names…
sdojjy May 5, 2022
bf308bf
Merge branch 'master' into metrics-add-namespace-label
ti-chi-bot May 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ var (
Name: "unmarshal_and_mount",
Help: "Bucketed histogram of processing time (s) of unmarshal and mount in mounter.",
Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
totalRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "mounter",
Name: "total_rows_count",
Help: "The total count of rows that are processed by mounter",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand Down
14 changes: 8 additions & 6 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ func NewMounter(schemaStorage SchemaStorage,
enableOldValue bool,
) Mounter {
return &mounterImpl{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
metricMountDuration: mountDuration.WithLabelValues(changefeedID.ID),
metricTotalRows: totalRowsCountGauge.WithLabelValues(changefeedID.ID),
tz: tz,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
metricMountDuration: mountDuration.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
}
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,8 @@ func (s *eventFeedSession) receiveFromStream(
}()

changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(changefeedID.ID)
metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(changefeedID.Namespace, changefeedID.ID)

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outter code logic
Expand Down
10 changes: 5 additions & 5 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ var (
Subsystem: "kvclient",
Name: "pull_event_count",
Help: "event count received by this puller",
}, []string{"type", "changefeed"})
}, []string{"type", "namespace", "changefeed"})
sendEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "send_event_count",
Help: "event count sent to event channel by this puller",
}, []string{"type", "changefeed"})
}, []string{"type", "namespace", "changefeed"})
clientChannelSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -78,22 +78,22 @@ var (
Subsystem: "kvclient",
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "changefeed"})
}, []string{"store", "namespace", "changefeed"})
cachedRegionSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "cached_region",
Help: "cached region that has not requested to TiKV in kv client",
}, []string{"store", "changefeed"})
}, []string{"store", "namespace", "changefeed"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "batch_resolved_event_size",
Help: "The number of region in one batch resolved ts event",
Buckets: prometheus.ExponentialBuckets(2, 2, 16),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
grpcPoolStreamGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down
16 changes: 8 additions & 8 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,21 @@ func (w *regionWorker) initMetrics(ctx context.Context) {
metrics.metricReceivedEventSize = eventSize.WithLabelValues("received")
metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped")
metrics.metricPullEventInitializedCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventCommittedCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventCommitCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventPrewriteCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventRollbackCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventResolvedCounter = sendEventCounter.
WithLabelValues("native-resolved", changefeedID.ID)
WithLabelValues("native-resolved", changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventCommitCounter = sendEventCounter.
WithLabelValues("commit", changefeedID.ID)
WithLabelValues("commit", changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventCommittedCounter = sendEventCounter.
WithLabelValues("committed", changefeedID.ID)
WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID)

w.metrics = metrics
}
Expand Down
8 changes: 5 additions & 3 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.buffer[id] = append(r.buffer[id], sri)
if _, ok := r.metrics.cachedRegions[id]; !ok {
r.metrics.cachedRegions[id] = cachedRegionSize.
WithLabelValues(id, r.metrics.changefeed.ID)
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.cachedRegions[id].Inc()
}
Expand All @@ -116,7 +116,8 @@ func (r *sizedRegionRouter) Acquire(id string) {
defer r.lock.Unlock()
r.tokens[id]++
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.changefeed.ID)
r.metrics.tokens[id] = clientRegionTokenSize.
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.tokens[id].Inc()
}
Expand All @@ -128,7 +129,8 @@ func (r *sizedRegionRouter) Release(id string) {
defer r.lock.Unlock()
r.tokens[id]--
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.changefeed.ID)
r.metrics.tokens[id] = clientRegionTokenSize.
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.tokens[id].Dec()
}
Expand Down
24 changes: 12 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,17 @@ LOOP:

// init metrics
c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = changefeedTickDuration.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -402,20 +402,20 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.wg.Wait()
c.scheduler.Close(ctx)

changefeedCheckpointTsGauge.DeleteLabelValues(c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.ID)
changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.ID)
changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id.ID)
changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = nil

changefeedBarrierTsGauge.DeleteLabelValues(c.id.ID)
changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedBarrierTsGauge = nil

c.initialized = false
Expand Down
16 changes: 8 additions & 8 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,36 @@ var (
Subsystem: "owner",
Name: "barrier_ts",
Help: "barrier ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})

changefeedCheckpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts",
Help: "checkpoint ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedCheckpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand All @@ -69,22 +69,22 @@ var (
Subsystem: "owner",
Name: "maintain_table_num",
Help: "number of replicated tables maintained in owner",
}, []string{"changefeed", "capture", "type"})
}, []string{"namespace", "changefeed", "capture", "type"})
changefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "status",
Help: "The status of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_tick_duration",
Help: "Bucketed histogram of owner tick changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
14 changes: 8 additions & 6 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
if conf.Debug != nil && conf.Debug.EnableNewScheduler {
for cfID, cf := range o.changefeeds {
if cf.state != nil && cf.state.Info != nil {
changefeedStatusGauge.WithLabelValues(cfID.ID).
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

Expand All @@ -374,11 +374,11 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.
WithLabelValues(cfID.ID,
WithLabelValues(cfID.Namespace, cfID.ID,
info.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID.ID,
WithLabelValues(cfID.Namespace, cfID.ID,
info.AdvertiseAddr, maintainTableTypeWip).
Set(float64(pendingCounts[captureID]))
}
Expand All @@ -393,13 +393,15 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
continue
}
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID.ID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).
WithLabelValues(changefeedID.Namespace, changefeedID.ID,
captureInfo.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID.ID, captureInfo.AdvertiseAddr, maintainTableTypeWip).
WithLabelValues(changefeedID.Namespace, changefeedID.ID,
captureInfo.AdvertiseAddr, maintainTableTypeWip).
Set(float64(len(taskStatus.Operation)))
if changefeedState.Info != nil {
changefeedStatusGauge.WithLabelValues(changefeedID.ID).
changefeedStatusGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID).
Set(float64(changefeedState.Info.State.ToInt()))
}
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,71 +24,71 @@ var (
Subsystem: "processor",
Name: "resolved_ts",
Help: "local resolved ts of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
resolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts_lag",
Help: "local resolved ts lag of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
resolvedTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_resolved_table_id",
Help: "ID of the minimum resolved table",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts",
Help: "global checkpoint ts of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts_lag",
Help: "global checkpoint ts lag of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_checkpoint_table_id",
Help: "ID of the minimum checkpoint table",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
syncTableNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "num_of_tables",
Help: "number of synchronized table of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_tick_duration",
Help: "Bucketed histogram of processorManager tick processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
Loading