Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add precheck about downstream CDC/PiTR #39338

Merged
merged 14 commits into from
Nov 30, 2022
Next Next commit
lightning: add precheck about downstream CDC/PiTR
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Nov 23, 2022
commit 4f924c66111366ed76d283a81dfe13fd0be3291b
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
CheckTargetClusterVersion CheckItemID = "CHECK_TARGET_CLUSTER_VERSION"
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR"
)

type CheckResult struct {
Expand Down Expand Up @@ -139,6 +140,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
return NewLocalDiskPlacementCheckItem(b.cfg), nil
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
134 changes: 134 additions & 0 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
package restore

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
Expand All @@ -32,6 +35,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/store/pdtypes"
Expand All @@ -40,9 +44,12 @@ import (
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/set"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

type clusterResourceCheckItem struct {
Expand Down Expand Up @@ -672,6 +679,133 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
return msgs, nil
}

type cdcPITRCheckItem struct {
cfg *config.Config
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem {
return &cdcPITRCheckItem{
cfg: cfg,
}
}

// GetCheckItemID implements PrecheckItem interface.
func (ci *cdcPITRCheckItem) GetCheckItemID() CheckItemID {
return CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
var (
tlsConfig *tls.Config
)

if cfg2, err2 := cfg.ToTLS(); err2 != nil {
return nil, err2
} else {
tlsConfig = cfg2.TLSConfig()
}

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: false,
}),
grpc.WithBlock(),
grpc.WithReturnConnectionError(),
},
Context: ctx,
})
}

// Check implements PrecheckItem interface.
func (ci *cdcPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) {
theResult := &CheckResult{
Item: ci.GetCheckItemID(),
Severity: Critical,
}

if ci.cfg.TikvImporter.Backend != config.BackendLocal {
theResult.Passed = true
theResult.Message = "TiDB Lightning is not using local backend, skip this check"
return theResult, nil
}

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
if err != nil {
return nil, errors.Trace(err)
}
}

errorMsg := make([]string, 0, 2)

pitrCli := streamhelper.NewMetaDataClient(ci.etcdCli)
tasks, err := pitrCli.GetAllTasks(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if len(tasks) > 0 {
names := make([]string, 0, len(tasks))
for _, task := range tasks {
names = append(names, task.Info.GetName())
}
errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v, local backend is not compatible with them", names))
}

cdcPrefix := "/tidb/cdc/"
capturePath := []byte("/__cdc_meta__/capture/")
nameSet := make(map[string][]string, 1)
resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return nil, errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/<clusterID>/__cdc_meta__/capture/<captureID>
k := kv.Key[len(cdcPrefix):]
clusterID, captureID, found := bytes.Cut(k, capturePath)
if found {
nameSet[string(clusterID)] = append(nameSet[string(clusterID)], string(captureID))
}
}
if len(nameSet) > 0 {
var captureMsgBuf strings.Builder
captureMsgBuf.WriteString("found CDC capture(s): ")
isFirst := true
for clusterID, captureIDs := range nameSet {
if !isFirst {
captureMsgBuf.WriteString(", ")
}
isFirst = false
captureMsgBuf.WriteString("clusterID: ")
captureMsgBuf.WriteString(clusterID)
captureMsgBuf.WriteString(" captureID(s): ")
captureMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs))
}
captureMsgBuf.WriteString("local backend is not compatible with them")
errorMsg = append(errorMsg, captureMsgBuf.String())
}

if len(errorMsg) > 0 {
theResult.Passed = false
theResult.Message = strings.Join(errorMsg, "\n")
} else {
theResult.Passed = true
theResult.Message = "no CDC or PiTR task found"
}

return theResult, nil
}

type schemaCheckItem struct {
cfg *config.Config
preInfoGetter PreRestoreInfoGetter
Expand Down
28 changes: 28 additions & 0 deletions br/pkg/lightning/restore/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/restore/mock"
ropts "github.com/pingcap/tidb/br/pkg/lightning/restore/opts"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/tests/v3/integration"
)

type precheckImplSuite struct {
Expand Down Expand Up @@ -581,3 +582,30 @@ func (s *precheckImplSuite) TestTableEmptyCheckBasic() {
s.T().Logf("check result message: %s", result.Message)
s.Require().False(result.Passed)
}

func (s *precheckImplSuite) TestCDCPITRCheckItem() {
integration.BeforeTestExternal(s.T())
testEtcdCluster := integration.NewClusterV3(s.T(), &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(s.T())

ctx := context.Background()
cfg := &config.Config{
TikvImporter: config.TikvImporter{
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
checker := ci.(*cdcPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
s.Require().NoError(err)
s.Require().NotNil(result)
s.Require().Equal(ci.GetCheckItemID(), result.Item)
s.Require().Equal(Critical, result.Severity)
s.T().Logf("check result message: %s", result.Message)
s.Require().True(result.Passed)

cli := testEtcdCluster.RandClient()
// TODO: what's real data?
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
_, err = cli.Put(ctx, "/tidb/cdc/pitr/changefeed/info/1", "test")
}
5 changes: 3 additions & 2 deletions br/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) {
return true, nil
}

// CheckLogBackupTaskExist increases the count of log backup task.
// LogBackupTaskCountInc increases the count of log backup task.
func LogBackupTaskCountInc() {
LogBackupTaskMutex.Lock()
logBackupTaskCount++
LogBackupTaskMutex.Unlock()
}

// CheckLogBackupTaskExist decreases the count of log backup task.
// LogBackupTaskCountDec decreases the count of log backup task.
func LogBackupTaskCountDec() {
LogBackupTaskMutex.Lock()
logBackupTaskCount--
Expand All @@ -122,6 +122,7 @@ func IsLogBackupInUse(ctx sessionctx.Context) bool {
return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist()
}

// GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.
func GetTidbNewCollationEnabled() string {
return tidbNewCollationEnabled
}