Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ go_library(
"//pkg/lightning/backend/local",
"//pkg/lightning/common",
"//pkg/lightning/config",
"//pkg/lightning/metric",
"//pkg/meta",
"//pkg/meta/autoid",
"//pkg/meta/metabuild",
Expand Down
46 changes: 26 additions & 20 deletions pkg/ddl/backfilling_import_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/local"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
lightningmetric "github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -44,6 +46,7 @@ type cloudImportExecutor struct {
backendCtx ingest.BackendCtx
backend *local.Backend
taskConcurrency int
metric *lightningmetric.Common
}

func newCloudImportExecutor(
Expand All @@ -64,30 +67,32 @@ func newCloudImportExecutor(
}, nil
}

func (m *cloudImportExecutor) Init(ctx context.Context) error {
func (e *cloudImportExecutor) Init(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor init subtask exec env")
cfg, bd, err := ingest.CreateLocalBackend(ctx, m.store, m.job, false, m.taskConcurrency)
e.metric = metrics.RegisterLightningCommonMetricsForDDL(e.job.ID)
ctx = lightningmetric.WithCommonMetric(ctx, e.metric)
cfg, bd, err := ingest.CreateLocalBackend(ctx, e.store, e.job, false, e.taskConcurrency)
if err != nil {
return errors.Trace(err)
}
bCtx, err := ingest.NewBackendCtxBuilder(ctx, m.store, m.job).Build(cfg, bd)
bCtx, err := ingest.NewBackendCtxBuilder(ctx, e.store, e.job).Build(cfg, bd)
if err != nil {
bd.Close()
return err
}
m.backend = bd
m.backendCtx = bCtx
e.backend = bd
e.backendCtx = bCtx
return nil
}

func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
func (e *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Subtask) error {
logutil.Logger(ctx).Info("cloud import executor run subtask")

sm, err := decodeBackfillSubTaskMeta(ctx, m.cloudStoreURI, subtask.Meta)
sm, err := decodeBackfillSubTaskMeta(ctx, e.cloudStoreURI, subtask.Meta)
if err != nil {
return err
}
local := m.backendCtx.GetLocalBackend()
local := e.backendCtx.GetLocalBackend()
if local == nil {
return errors.Errorf("local backend not found")
}
Expand All @@ -98,7 +103,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
)
switch len(sm.EleIDs) {
case 1:
for _, idx := range m.indexes {
for _, idx := range e.indexes {
if idx.ID == sm.EleIDs[0] {
currentIdx = idx
idxID = idx.ID
Expand All @@ -107,15 +112,15 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
}
case 0:
// maybe this subtask is generated from an old version TiDB
if len(m.indexes) == 1 {
currentIdx = m.indexes[0]
if len(e.indexes) == 1 {
currentIdx = e.indexes[0]
}
idxID = m.indexes[0].ID
idxID = e.indexes[0].ID
default:
return errors.Errorf("unexpected EleIDs count %v", sm.EleIDs)
}

_, engineUUID := backend.MakeUUID(m.ptbl.Meta().Name.L, idxID)
_, engineUUID := backend.MakeUUID(e.ptbl.Meta().Name.L, idxID)

all := external.SortedKVMeta{}
for _, g := range sm.MetaGroups {
Expand All @@ -129,7 +134,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
}
err = local.CloseEngine(ctx, &backend.EngineConfig{
External: &backend.ExternalEngineConfig{
StorageURI: m.cloudStoreURI,
StorageURI: e.cloudStoreURI,
DataFiles: sm.DataFiles,
StatFiles: sm.StatFiles,
StartKey: all.StartKey,
Expand All @@ -139,7 +144,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
TotalFileSize: int64(all.TotalKVSize),
TotalKVCount: 0,
CheckHotspot: true,
MemCapacity: m.GetResource().Mem.Capacity(),
MemCapacity: e.GetResource().Mem.Capacity(),
},
TS: sm.TS,
}, engineUUID)
Expand All @@ -155,7 +160,7 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
}

if currentIdx != nil {
return ingest.TryConvertToKeyExistsErr(err, currentIdx, m.ptbl.Meta())
return ingest.TryConvertToKeyExistsErr(err, currentIdx, e.ptbl.Meta())
}

// cannot fill the index name for subtask generated from an old version TiDB
Expand All @@ -169,11 +174,12 @@ func (m *cloudImportExecutor) RunSubtask(ctx context.Context, subtask *proto.Sub
return kv.ErrKeyExists
}

func (m *cloudImportExecutor) Cleanup(ctx context.Context) error {
func (e *cloudImportExecutor) Cleanup(ctx context.Context) error {
logutil.Logger(ctx).Info("cloud import executor clean up subtask env")
if m.backendCtx != nil {
m.backendCtx.Close()
if e.backendCtx != nil {
e.backendCtx.Close()
}
m.backend.Close()
e.backend.Close()
metrics.UnregisterLightningCommonMetricsForDDL(e.job.ID, e.metric)
return nil
}
10 changes: 10 additions & 0 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
lightningmetric "github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/table"
Expand All @@ -61,6 +62,8 @@ type readIndexStepExecutor struct {
backend *local.Backend
// pipeline of current running subtask, it's nil when no subtask is running.
currPipe atomic.Pointer[operator.AsyncPipeline]

metric *lightningmetric.Common
}

type readIndexSummary struct {
Expand Down Expand Up @@ -93,6 +96,10 @@ func (r *readIndexStepExecutor) Init(ctx context.Context) error {
logutil.DDLLogger().Info("read index executor init subtask exec env")
cfg := config.GetGlobalConfig()
if cfg.Store == config.StoreTypeTiKV {
if !r.isGlobalSort() {
r.metric = metrics.RegisterLightningCommonMetricsForDDL(r.job.ID)
ctx = lightningmetric.WithCommonMetric(ctx, r.metric)
}
cfg, bd, err := ingest.CreateLocalBackend(ctx, r.d.store, r.job, false, 0)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -176,6 +183,9 @@ func (r *readIndexStepExecutor) Cleanup(ctx context.Context) error {
if r.backend != nil {
r.backend.Close()
}
if !r.isGlobalSort() {
metrics.UnregisterLightningCommonMetricsForDDL(r.job.ID, r.metric)
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend"
"github.com/pingcap/tidb/pkg/lightning/backend/local"
litconfig "github.com/pingcap/tidb/pkg/lightning/config"
lightningmetric "github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/metabuild"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -2494,6 +2495,11 @@ func (w *worker) addPhysicalTableIndex(
return w.writePhysicalTableRecord(ctx, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo)
}
logutil.DDLLogger().Info("start to add table index", zap.Stringer("job", reorgInfo.Job), zap.Stringer("reorgInfo", reorgInfo))
m := metrics.RegisterLightningCommonMetricsForDDL(reorgInfo.ID)
ctx = lightningmetric.WithCommonMetric(ctx, m)
defer func() {
metrics.UnregisterLightningCommonMetricsForDDL(reorgInfo.ID, m)
}()
return w.writePhysicalTableRecord(ctx, w.sessPool, t, typeAddIndexWorker, reorgInfo)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ingest/testutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"//pkg/ddl/ingest",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/metrics",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
],
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/ingest/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
)
Expand Down Expand Up @@ -61,6 +62,10 @@ func CheckIngestLeakageForTest(exitCode int) {
fmt.Fprintf(os.Stderr, "add index leakage check failed: %s leak\n", leakObj)
os.Exit(1)
}
if registeredJob := metrics.GetRegisteredJob(); len(registeredJob) > 0 {
fmt.Fprintf(os.Stderr, "add index metrics leakage: %v\n", registeredJob)
os.Exit(1)
}
}
os.Exit(exitCode)
}
42 changes: 42 additions & 0 deletions pkg/metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,21 @@
package metrics

import (
"maps"
"strconv"
"strings"
"sync"

"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/util/promutil"
"github.com/prometheus/client_golang/prometheus"
)

var (
mu sync.Mutex
registeredJobMetrics = make(map[int64]*metric.Common, 64)
)

// Metrics for the DDL package.
var (
JobsGauge *prometheus.GaugeVec
Expand Down Expand Up @@ -264,3 +274,35 @@ func GetBackfillTotalByLabel(label, schemaName, tableName, optionalColOrIdxName
func GetBackfillProgressByLabel(label, schemaName, tableName, optionalColOrIdxName string) prometheus.Gauge {
return BackfillProgressGauge.WithLabelValues(generateReorgLabel(label, schemaName, tableName, optionalColOrIdxName))
}

// RegisterLightningCommonMetricsForDDL returns the registered common metrics.
func RegisterLightningCommonMetricsForDDL(jobID int64) *metric.Common {
mu.Lock()
defer mu.Unlock()
if m, ok := registeredJobMetrics[jobID]; ok {
return m
}
metrics := metric.NewCommon(promutil.NewDefaultFactory(), TiDB, "ddl", prometheus.Labels{
"job_id": strconv.FormatInt(jobID, 10),
})
metrics.RegisterTo(prometheus.DefaultRegisterer)
registeredJobMetrics[jobID] = metrics
return metrics
}

// UnregisterLightningCommonMetricsForDDL unregisters the registered common metrics.
func UnregisterLightningCommonMetricsForDDL(jobID int64, metrics *metric.Common) {
mu.Lock()
defer mu.Unlock()
metrics.UnregisterFrom(prometheus.DefaultRegisterer)
delete(registeredJobMetrics, jobID)
}

// GetRegisteredJob is used for test
func GetRegisteredJob() map[int64]*metric.Common {
mu.Lock()
defer mu.Unlock()
ret := make(map[int64]*metric.Common, len(registeredJobMetrics))
maps.Copy(ret, registeredJobMetrics)
return ret
}
96 changes: 96 additions & 0 deletions pkg/metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -16244,6 +16244,102 @@
"yaxis": {
"align": false
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Add Index Backfill Import Speed of Each Job",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 46
},
"hiddenSeries": false,
"id": 23763573003,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.17",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(rate(tidb_ddl_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", state=\"imported\"}[1m])) by(job_id)",
"hide": false,
"interval": "",
"legendFormat": "job-id {{job_id}}",
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Add Index Backfill Import Speed",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "binBps",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "DDL",
Expand Down