Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import into: switch tikv mode during import #44067

Merged
merged 8 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ mock_s3iface:
mock_lightning:
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend Backend,EngineWriter,TargetInfoGetter,ChunkFlushStatus > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage > br/pkg/mock/mocklocal/local.go
@mockgen -package mocklocal github.com/pingcap/tidb/br/pkg/lightning/backend/local DiskUsage,TiKVModeSwitcher > br/pkg/mock/mocklocal/local.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/backend/local/tikv_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,38 @@ import (
)

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type TiKVModeSwitcher struct {
type TiKVModeSwitcher interface {
// ToImportMode switches all TiKV nodes to Import mode.
ToImportMode(ctx context.Context)
// ToNormalMode switches all TiKV nodes to Normal mode.
ToNormalMode(ctx context.Context)
}

// TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
type switcher struct {
tls *common.TLS
pdAddr string
logger *zap.Logger
}

// NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) *TiKVModeSwitcher {
return &TiKVModeSwitcher{
func NewTiKVModeSwitcher(tls *common.TLS, pdAddr string, logger *zap.Logger) TiKVModeSwitcher {
return &switcher{
tls: tls,
pdAddr: pdAddr,
logger: logger,
}
}

// ToImportMode switches all TiKV nodes to Import mode.
func (rc *TiKVModeSwitcher) ToImportMode(ctx context.Context) {
func (rc *switcher) ToImportMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Import)
}

// ToNormalMode switches all TiKV nodes to Normal mode.
func (rc *TiKVModeSwitcher) ToNormalMode(ctx context.Context) {
func (rc *switcher) ToNormalMode(ctx context.Context) {
rc.switchTiKVMode(ctx, sstpb.SwitchMode_Normal)
}

func (rc *TiKVModeSwitcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
func (rc *switcher) switchTiKVMode(ctx context.Context, mode sstpb.SwitchMode) {
rc.logger.Info("switch tikv mode", zap.Stringer("mode", mode))

// It is fine if we miss some stores which did not switch to Import mode,
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (

defaultCSVDataCharacterSet = "binary"
defaultCSVDataInvalidCharReplace = utf8.RuneError

DefaultSwitchTiKVModeInterval = 5 * time.Minute
)

var (
Expand Down Expand Up @@ -929,7 +931,7 @@ func NewConfig() *Config {
ChecksumTableConcurrency: defaultChecksumTableConcurrency,
},
Cron: Cron{
SwitchMode: Duration{Duration: 5 * time.Minute},
SwitchMode: Duration{Duration: DefaultSwitchTiKVModeInterval},
LogProgress: Duration{Duration: 5 * time.Minute},
CheckDiskQuota: Duration{Duration: 1 * time.Minute},
},
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ type Controller struct {
preInfoGetter PreImportInfoGetter
precheckItemBuilder *PrecheckItemBuilder
encBuilder encode.EncodingBuilder
tikvModeSwitcher *local.TiKVModeSwitcher
tikvModeSwitcher local.TiKVModeSwitcher

keyspaceName string
}
Expand Down Expand Up @@ -1212,6 +1212,7 @@ func (rc *Controller) buildRunPeriodicActionAndCancelFunc(ctx context.Context, s

case <-switchModeChan:
// periodically switch to import mode, as requested by TiKV 3.0
// TiKV will switch back to normal mode if we didn't call this again within 10 minutes
rc.tikvModeSwitcher.ToImportMode(ctx)

case <-logProgressChan:
Expand Down
50 changes: 49 additions & 1 deletion br/pkg/mock/mocklocal/local.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle {
}
}

func (*litBackfillFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

// ProcessNormalFlow processes the normal flow.
func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
var globalTaskMeta BackfillGlobalMeta
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func (d *dispatcher) DispatchTaskLoop() {
if d.isRunningGTask(gTask.ID) {
continue
}
// owner changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tidb owner changed, so just setRunningGTask

if gTask.State == proto.TaskStateRunning || gTask.State == proto.TaskStateReverting || gTask.State == proto.TaskStateCancelling {
d.setRunningGTask(gTask)
cnt++
Expand Down Expand Up @@ -273,8 +274,9 @@ func (d *dispatcher) detectTask(gTask *proto.Task) {
case <-ticker.C:
// TODO: Consider actively obtaining information about task completion.
stepIsFinished, errStr := d.probeTask(gTask)
// The global task isn't finished and failed.
// The global task isn't finished and not failed.
if !stepIsFinished && len(errStr) == 0 {
GetTaskFlowHandle(gTask.Type).OnTicker(d.ctx, gTask)
logutil.BgLogger().Debug("detect task, this task keeps current state",
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
break
Expand Down
3 changes: 3 additions & 0 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ type NumberExampleHandle struct{}

var _ dispatcher.TaskFlowHandle = (*NumberExampleHandle)(nil)

func (NumberExampleHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

func (n NumberExampleHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepInit
Expand Down
4 changes: 4 additions & 0 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
// OnTicker is used to handle the ticker event, if business impl need to do some periodical work, you can
// do it here, but don't do too much work here, because the ticker interval is small, and it will block
// the event is generated every checkTaskRunningInterval, and only when the task NOT FINISHED and NO ERROR.
OnTicker(ctx context.Context, gTask *proto.Task)
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task) (subtaskMetas [][]byte, err error)
ProcessErrFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, receiveErr [][]byte) (subtaskMeta []byte, err error)
// GetEligibleInstances is used to get the eligible instances for the global task.
Expand Down
3 changes: 3 additions & 0 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type testFlowHandle struct{}

var _ dispatcher.TaskFlowHandle = (*testFlowHandle)(nil)

func (*testFlowHandle) OnTicker(_ context.Context, _ *proto.Task) {
}

func (*testFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) {
if gTask.State == proto.TaskStatePending {
gTask.Step = proto.StepOne
Expand Down
2 changes: 2 additions & 0 deletions disttask/loaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ go_library(
"//table/tables",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)
Expand Down
66 changes: 54 additions & 12 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package loaddata
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -29,16 +31,42 @@ import (
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// FlowHandle is the dispatcher for load data.
type FlowHandle struct{}
type flowHandle struct {
mu sync.RWMutex
// the last time we switch TiKV into IMPORT mode, this is a global operation, do it for one task makes
// no difference to do it for all tasks. So we do not need to record the switch time for each task.
lastSwitchTime atomic.Time
}

var _ dispatcher.TaskFlowHandle = (*flowHandle)(nil)

func (h *flowHandle) OnTicker(ctx context.Context, task *proto.Task) {
// only switch TiKV mode when task is running and reach the interval
if task.State != proto.TaskStateRunning || time.Since(h.lastSwitchTime.Load()) < config.DefaultSwitchTiKVModeInterval {
return
}

var _ dispatcher.TaskFlowHandle = (*FlowHandle)(nil)
h.mu.Lock()
defer h.mu.Unlock()
if time.Since(h.lastSwitchTime.Load()) < config.DefaultSwitchTiKVModeInterval {
return
}

// ProcessNormalFlow implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
logger := logutil.BgLogger().With(zap.Int64("task_id", task.ID))
switcher, err := importer.GetTiKVModeSwitcher(logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
}
switcher.ToImportMode(ctx)
h.lastSwitchTime.Store(time.Now())
}

func (h *flowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task) ([][]byte, error) {
logger := logutil.BgLogger().With(zap.String("component", "dispatcher"), zap.String("type", gTask.Type), zap.Int64("ID", gTask.ID))
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
Expand All @@ -49,6 +77,7 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.Task

switch gTask.Step {
case Import:
h.switchTiKV2NormalMode(ctx, logutil.BgLogger())
if err := postProcess(ctx, handle, gTask, logger); err != nil {
return nil, err
}
Expand Down Expand Up @@ -78,16 +107,15 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, handle dispatcher.Task
return metaBytes, nil
}

// ProcessErrFlow implements dispatcher.ProcessErrFlow interface.
func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, receiveErr [][]byte) ([]byte, error) {
func (h *flowHandle) ProcessErrFlow(ctx context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, receiveErr [][]byte) ([]byte, error) {
logger := logutil.BgLogger().With(zap.String("component", "dispatcher"), zap.String("type", gTask.Type), zap.Int64("ID", gTask.ID))
logger.Info("process error flow", zap.ByteStrings("error message", receiveErr))
h.switchTiKV2NormalMode(ctx, logger)
gTask.Error = receiveErr[0]
return nil, nil
}

// GetEligibleInstances implements dispatcher.TaskFlowHandle interface.
func (*FlowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, error) {
func (*flowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, error) {
taskMeta := &TaskMeta{}
err := json.Unmarshal(gTask.Meta, taskMeta)
if err != nil {
Expand All @@ -99,12 +127,26 @@ func (*FlowHandle) GetEligibleInstances(ctx context.Context, gTask *proto.Task)
return dispatcher.GenerateSchedulerNodes(ctx)
}

// IsRetryableErr implements dispatcher.IsRetryableErr interface.
func (*FlowHandle) IsRetryableErr(error) bool {
func (*flowHandle) IsRetryableErr(error) bool {
// TODO: check whether the error is retryable.
return false
}

func (h *flowHandle) switchTiKV2NormalMode(ctx context.Context, logger *zap.Logger) {
h.mu.Lock()
defer h.mu.Unlock()

switcher, err := importer.GetTiKVModeSwitcher(logger)
if err != nil {
logger.Warn("get tikv mode switcher failed", zap.Error(err))
return
}
switcher.ToNormalMode(ctx)

// clear it, so next task can switch TiKV mode again.
h.lastSwitchTime.Store(time.Time{})
}

// postProcess does the post processing for the task.
func postProcess(ctx context.Context, handle dispatcher.TaskHandle, gTask *proto.Task, logger *zap.Logger) error {
taskMeta := &TaskMeta{}
Expand Down Expand Up @@ -237,5 +279,5 @@ func generateSubtaskMetas(ctx context.Context, taskMeta *TaskMeta) (subtaskMetas
}

func init() {
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{})
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &flowHandle{})
}
2 changes: 1 addition & 1 deletion disttask/loaddata/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *loadDataSuite) TestFlowHandleGetEligibleInstances() {
}
mockedAllServerInfos := makeFailpointRes(serverInfoMap)

h := FlowHandle{}
h := flowHandle{}
gTask := &proto.Task{Meta: []byte("{}")}
s.enableFailPoint("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", mockedAllServerInfos)
eligibleInstances, err := h.GetEligibleInstances(context.Background(), gTask)
Expand Down
3 changes: 2 additions & 1 deletion disttask/loaddata/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt s
JobImportParam: param,
plan: plan,
stmt: stmt,
logger: logutil.BgLogger().With(zap.String("component", "distribute importer"), zap.Int("id", int(param.Job.ID))),
logger: logutil.BgLogger().With(zap.String("component", "importer"), zap.Int("id", int(param.Job.ID))),
}, nil
}

Expand All @@ -67,6 +67,7 @@ func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan
plan: plan,
stmt: stmt,
instance: serverInfo,
logger: logutil.BgLogger().With(zap.String("component", "importer"), zap.Int("id", int(param.Job.ID))),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions disttask/loaddata/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

// TaskStep of LoadData.
const (
// Import we sort source data and ingest it into TiKV in this step.
Import int64 = 1
)

Expand Down
Loading