Skip to content

Commit

Permalink
lightning: add precheck about downstream CDC/PiTR (#39338)
Browse files Browse the repository at this point in the history
close #39346
  • Loading branch information
lance6716 authored Nov 30, 2022
1 parent 1de2bf6 commit 328aef8
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ bazel_golangcilinter:
-- run $$($(PACKAGE_DIRECTORIES)) --config ./.golangci.yaml

bazel_brietest: failpoint-enable bazel_ci_prepare
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --remote_download_minimal --test_arg=-with-real-tikv \
bazel $(BAZEL_GLOBAL_CONFIG) test $(BAZEL_CMD_CONFIG) --test_arg=-with-real-tikv \
-- //tests/realtikvtest/brietest/...

bazel_pessimistictest: failpoint-enable bazel_ci_prepare
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))

ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint"))
ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint"))
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"//br/pkg/pdutil",
"//br/pkg/redact",
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/utils",
"//br/pkg/version",
"//br/pkg/version/build",
Expand Down Expand Up @@ -77,6 +78,9 @@ go_library(
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//errgroup",
Expand Down Expand Up @@ -124,6 +128,7 @@ go_test(
"//br/pkg/lightning/worker",
"//br/pkg/mock",
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/version/build",
"//ddl",
"//errno",
Expand Down Expand Up @@ -158,6 +163,8 @@ go_test(
"@com_github_tikv_pd_client//:client",
"@com_github_xitongsys_parquet_go//writer",
"@com_github_xitongsys_parquet_go_source//buffer",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,10 @@ func (rc *Controller) checkSourceSchema(ctx context.Context) error {
}
return rc.doPreCheckOnItem(ctx, CheckSourceSchemaValid)
}

func (rc *Controller) checkCDCPiTR(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB {
return nil
}
return rc.doPreCheckOnItem(ctx, CheckTargetUsingCDCPITR)
}
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
CheckTargetClusterVersion CheckItemID = "CHECK_TARGET_CLUSTER_VERSION"
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR"
)

type CheckResult struct {
Expand Down Expand Up @@ -139,6 +140,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
154 changes: 154 additions & 0 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package restore

import (
"bytes"
"context"
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/store/pdtypes"
Expand All @@ -40,9 +43,12 @@ import (
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/set"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -672,6 +678,154 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
return msgs, nil
}

// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
}
}

// GetCheckItemID implements PrecheckItem interface.
func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID {
return CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
}
tlsConfig := cfg2.TLSConfig()

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Context: ctx,
})
}

// Check implements PrecheckItem interface.
func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) {
theResult := &CheckResult{
Item: ci.GetCheckItemID(),
Severity: Critical,
}

if ci.cfg.TikvImporter.Backend != config.BackendLocal {
theResult.Passed = true
theResult.Message = "TiDB Lightning is not using local backend, skip this check"
return theResult, nil
}

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
if err != nil {
return nil, errors.Trace(err)
}
//nolint: errcheck
defer ci.etcdCli.Close()
}

errorMsg := make([]string, 0, 2)

pitrCli := streamhelper.NewMetaDataClient(ci.etcdCli)
tasks, err := pitrCli.GetAllTasks(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if len(tasks) > 0 {
names := make([]string, 0, len(tasks))
for _, task := range tasks {
names = append(names, task.Info.GetName())
}
errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names))
}

// check etcd KV of CDC >= v6.2
cdcPrefix := "/tidb/cdc/"
capturePath := []byte("/__cdc_meta__/capture/")
nameSet := make(map[string][]string, 1)
resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/<clusterID>/__cdc_meta__/capture/<captureID>
k := kv.Key[len(cdcPrefix):]
clusterID, captureID, found := bytes.Cut(k, capturePath)
if found {
nameSet[string(clusterID)] = append(nameSet[string(clusterID)], string(captureID))
}
}
if len(nameSet) == 0 {
// check etcd KV of CDC <= v6.1
cdcPrefixV61 := "/tidb/cdc/capture/"
resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/capture/<captureID>
k := kv.Key[len(cdcPrefixV61):]
if len(k) == 0 {
continue
}
nameSet["<nil>"] = append(nameSet["<nil>"], string(k))
}
}

if len(nameSet) > 0 {
var captureMsgBuf strings.Builder
captureMsgBuf.WriteString("found CDC capture(s): ")
isFirst := true
for clusterID, captureIDs := range nameSet {
if !isFirst {
captureMsgBuf.WriteString(", ")
}
isFirst = false
captureMsgBuf.WriteString("clusterID: ")
captureMsgBuf.WriteString(clusterID)
captureMsgBuf.WriteString(" captureID(s): ")
captureMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs))
}
captureMsgBuf.WriteString(",")
errorMsg = append(errorMsg, captureMsgBuf.String())
}

if len(errorMsg) > 0 {
errorMsg = append(errorMsg, ci.Instruction)
theResult.Passed = false
theResult.Message = strings.Join(errorMsg, "\n")
} else {
theResult.Passed = true
theResult.Message = "no CDC or PiTR task found"
}

return theResult, nil
}

type schemaCheckItem struct {
cfg *config.Config
preInfoGetter PreRestoreInfoGetter
Expand Down
87 changes: 87 additions & 0 deletions br/pkg/lightning/restore/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
)

type precheckImplSuite struct {
Expand Down Expand Up @@ -581,3 +585,86 @@ func (s *precheckImplSuite) TestTableEmptyCheckBasic() {
s.T().Logf("check result message: %s", result.Message)
s.Require().False(result.Passed)
}

func (s *precheckImplSuite) TestCDCPITRCheckItem() {
integration.BeforeTestExternal(s.T())
testEtcdCluster := integration.NewClusterV3(s.T(), &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(s.T())

ctx := context.Background()
cfg := &config.Config{
TikvImporter: config.TikvImporter{
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
s.Require().NoError(err)
s.Require().NotNil(result)
s.Require().Equal(ci.GetCheckItemID(), result.Item)
s.Require().Equal(Critical, result.Severity)
s.Require().True(result.Passed)
s.Require().Equal("no CDC or PiTR task found", result.Message)

cli := testEtcdCluster.RandClient()
brCli := streamhelper.NewMetaDataClient(cli)
backend, _ := storage.ParseBackend("noop://", nil)
taskInfo, err := streamhelper.NewTaskInfo("br_name").
FromTS(1).
UntilTS(1000).
WithTableFilter("*.*", "!mysql").
ToStorage(backend).
Check()
s.Require().NoError(err)
err = brCli.PutTask(ctx, *taskInfo)
s.Require().NoError(err)
checkEtcdPut := func(key string) {
_, err := cli.Put(ctx, key, "")
s.Require().NoError(err)
}
// TiCDC >= v6.2
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count")
checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639")
checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test")
checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test-1")
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test")
checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1")
checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1")
checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922")

result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().False(result.Passed)
s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+
"found CDC capture(s): clusterID: default captureID(s): [3ecd5c98-0148-4086-adfd-17641995e71f],\n"+
"local backend is not compatible with them. Please switch to tidb backend then try again.",
result.Message)

_, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix())
s.Require().NoError(err)

// TiCDC <= v6.1
checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d")
checkEtcdPut("/tidb/cdc/changefeed/info/test")
checkEtcdPut("/tidb/cdc/job/test")
checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b")
checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test")

result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().False(result.Passed)
s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+
"found CDC capture(s): clusterID: <nil> captureID(s): [f14cb04d-5ba1-410e-a59b-ccd796920e9d],\n"+
"local backend is not compatible with them. Please switch to tidb backend then try again.",
result.Message)

checker.cfg.TikvImporter.Backend = config.BackendTiDB
result, err = ci.Check(ctx)
s.Require().NoError(err)
s.Require().True(result.Passed)
s.Require().Equal("TiDB Lightning is not using local backend, skip this check", result.Message)
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2136,6 +2136,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
}
}
// even if checkpoint exists, we still need to make sure CDC/PiTR task is not running.
if err := rc.checkCDCPiTR(ctx); err != nil {
return common.ErrCheckCDCPiTR.Wrap(err).GenWithStackByArgs()
}
}
}

Expand Down
Loading

0 comments on commit 328aef8

Please sign in to comment.