Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ GO := GO111MODULE=on go
PACKAGES := go list ./...
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'

# test
# build & test
BR_BIN_PATH ?= bin/tikv-br
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
Expand Down Expand Up @@ -55,7 +56,7 @@ test: tools/bin/gocov tools/bin/gocov-xml
make failpoint/disable

test/integration: build build/rawkv-helper
./tests/br_rawkv/run.py --test-helper=bin/rawkv --pd=$(PD_ADDR) --br=bin/br --br-storage=local://$(BR_LOCAL_STORE)
./tests/rawkv/run.py --test-helper=bin/rawkv --pd=$(PD_ADDR) --br=$(BR_BIN_PATH) --br-storage=local://$(BR_LOCAL_STORE)

failpoint/enable: tools/bin/failpoint-ctl
find `pwd` -type d | grep -vE "(\.git|tools)" | xargs tools/bin/failpoint-ctl enable
Expand All @@ -76,10 +77,10 @@ tools/bin/failpoint-ctl: tools/check/go.mod
cd tools/check && $(GO) build -o ../bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl

build:
CGO_ENABLED=1 $(GO) build -tags codes -ldflags '$(LDFLAGS)' -o bin/br cmd/br/*.go
CGO_ENABLED=1 $(GO) build -tags codes -ldflags '$(LDFLAGS)' -o $(BR_BIN_PATH) cmd/br/*.go

build/rawkv-helper:
cd tests/br_rawkv && $(GO) build -mod=mod -o ../../bin/rawkv client.go
cd tests/rawkv && $(GO) build -mod=mod -o ../../bin/rawkv client.go

clean:
go clean -i ./...
Expand Down
7 changes: 5 additions & 2 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ func startPProf(cmd *cobra.Command) error {
if err != nil {
return errors.Trace(err)
}
ca, cert, key, err := task.ParseTLSTripleFromFlags(cmd.Flags())

tlsConfig := &task.TLSConfig{}
err = tlsConfig.ParseFromFlags(cmd.Flags())
if err != nil {
return errors.Trace(err)
}

// Host isn't used here.
tls, err := tidbutils.NewTLS(ca, cert, key, "localhost", nil)
tls, err := tidbutils.NewTLS(tlsConfig.CA, tlsConfig.Cert, tlsConfig.Key, "localhost", nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
131 changes: 0 additions & 131 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,8 @@
package task

import (
"strconv"
"time"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
berrors "github.com/tikv/migration/br/pkg/errors"
"github.com/tikv/migration/br/pkg/utils"
)

Expand All @@ -36,20 +27,6 @@ type CompressionConfig struct {
CompressionLevel int32 `json:"compression-level" toml:"compression-level"`
}

// BackupConfig is the configuration specific for backup tasks.
type BackupConfig struct {
Config

TimeAgo time.Duration `json:"time-ago" toml:"time-ago"`
BackupTS uint64 `json:"backup-ts" toml:"backup-ts"`
LastBackupTS uint64 `json:"last-backup-ts" toml:"last-backup-ts"`
GCTTL int64 `json:"gc-ttl" toml:"gc-ttl"`
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
IgnoreStats bool `json:"ignore-stats" toml:"ignore-stats"`
UseBackupMetaV2 bool `json:"use-backupmeta-v2"`
CompressionConfig
}

// DefineBackupFlags defines common flags for the backup command.
func DefineBackupFlags(flags *pflag.FlagSet) {
flags.Duration(
Expand Down Expand Up @@ -90,111 +67,3 @@ func DefineBackupFlags(flags *pflag.FlagSet) {
// finally v4.0.17 will set this flag to true, and generate v2 meta.
_ = flags.MarkHidden(flagUseBackupMetaV2)
}

// ParseFromFlags parses the backup-related flags from the flag set.
func (cfg *BackupConfig) ParseFromFlags(flags *pflag.FlagSet) error {
timeAgo, err := flags.GetDuration(flagBackupTimeago)
if err != nil {
return errors.Trace(err)
}
if timeAgo < 0 {
return errors.Annotate(berrors.ErrInvalidArgument, "negative timeago is not allowed")
}
cfg.TimeAgo = timeAgo
cfg.LastBackupTS, err = flags.GetUint64(flagLastBackupTS)
if err != nil {
return errors.Trace(err)
}
backupTS, err := flags.GetString(flagBackupTS)
if err != nil {
return errors.Trace(err)
}
cfg.BackupTS, err = parseTSString(backupTS)
if err != nil {
return errors.Trace(err)
}
gcTTL, err := flags.GetInt64(flagGCTTL)
if err != nil {
return errors.Trace(err)
}
cfg.GCTTL = gcTTL

compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionConfig = *compressionCfg

if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
cfg.IgnoreStats, err = flags.GetBool(flagIgnoreStats)
if err != nil {
return errors.Trace(err)
}
cfg.UseBackupMetaV2, err = flags.GetBool(flagUseBackupMetaV2)
return errors.Trace(err)
}

// parseCompressionFlags parses the backup-related flags from the flag set.
func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) {
compressionStr, err := flags.GetString(flagCompressionType)
if err != nil {
return nil, errors.Trace(err)
}
compressionType, err := parseCompressionType(compressionStr)
if err != nil {
return nil, errors.Trace(err)
}
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return nil, errors.Trace(err)
}
return &CompressionConfig{
CompressionLevel: level,
CompressionType: compressionType,
}, nil
}

// parseTSString port from tidb setSnapshotTS.
func parseTSString(ts string) (uint64, error) {
if len(ts) == 0 {
return 0, nil
}
if tso, err := strconv.ParseUint(ts, 10, 64); err == nil {
return tso, nil
}

loc := time.Local
sc := &stmtctx.StatementContext{
TimeZone: loc,
}
t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp)
if err != nil {
return 0, errors.Trace(err)
}
t1, err := t.GoTime(loc)
if err != nil {
return 0, errors.Trace(err)
}
return oracle.GoTimeToTS(t1), nil
}

func parseCompressionType(s string) (backuppb.CompressionType, error) {
var ct backuppb.CompressionType
switch s {
case "lz4":
ct = backuppb.CompressionType_LZ4
case "snappy":
ct = backuppb.CompressionType_SNAPPY
case "zstd":
ct = backuppb.CompressionType_ZSTD
default:
return backuppb.CompressionType_UNKNOWN, errors.Annotatef(berrors.ErrInvalidArgument, "invalid compression type '%s'", s)
}
return ct, nil
}
109 changes: 22 additions & 87 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,118 +3,52 @@
package task

import (
"bytes"
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tikv/migration/br/pkg/backup"
berrors "github.com/tikv/migration/br/pkg/errors"
"github.com/tikv/migration/br/pkg/glue"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/rtree"
"github.com/tikv/migration/br/pkg/storage"
"github.com/tikv/migration/br/pkg/summary"
"github.com/tikv/migration/br/pkg/utils"
"go.uber.org/zap"
)

const (
flagKeyFormat = "format"
flagTiKVColumnFamily = "cf"
flagStartKey = "start"
flagEndKey = "end"
flagKeyFormat = "format"
flagStartKey = "start"
flagEndKey = "end"
flagDstAPIVersion = "dst-api-version"
)

// RawKvConfig is the common config for rawkv backup and restore.
type RawKvConfig struct {
Config

StartKey []byte `json:"start-key" toml:"start-key"`
EndKey []byte `json:"end-key" toml:"end-key"`
CF string `json:"cf" toml:"cf"`
CompressionConfig
RemoveSchedulers bool `json:"remove-schedulers" toml:"remove-schedulers"`
}

// DefineRawBackupFlags defines common flags for the backup command.
func DefineRawBackupFlags(command *cobra.Command) {
command.Flags().StringP(flagKeyFormat, "", "hex", "start/end key format, support raw|escaped|hex")
command.Flags().StringP(flagTiKVColumnFamily, "", "default", "backup specify cf, correspond to tikv cf")
command.Flags().StringP(flagStartKey, "", "", "backup raw kv start key, key is inclusive")
command.Flags().StringP(flagEndKey, "", "", "backup raw kv end key, key is exclusive")
command.Flags().String(flagCompressionType, "zstd",
"backup sst file compression algorithm, value can be one of 'lz4|zstd|snappy'")
command.Flags().Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup")
// This flag can impact the online cluster, so hide it in case of abuse.
_ = command.Flags().MarkHidden(flagRemoveSchedulers)
}
command.Flags().StringP(flagStartKey, "", "",
"The start key of the backup task, key is inclusive.")

// ParseFromFlags parses the raw kv backup&restore common flags from the flag set.
func (cfg *RawKvConfig) ParseFromFlags(flags *pflag.FlagSet) error {
format, err := flags.GetString(flagKeyFormat)
if err != nil {
return errors.Trace(err)
}
start, err := flags.GetString(flagStartKey)
if err != nil {
return errors.Trace(err)
}
cfg.StartKey, err = utils.ParseKey(format, start)
if err != nil {
return errors.Trace(err)
}
end, err := flags.GetString(flagEndKey)
if err != nil {
return errors.Trace(err)
}
cfg.EndKey, err = utils.ParseKey(format, end)
if err != nil {
return errors.Trace(err)
}
command.Flags().StringP(flagEndKey, "", "",
"The end key of the backup task, key is exclusive.")

if len(cfg.StartKey) > 0 && len(cfg.EndKey) > 0 && bytes.Compare(cfg.StartKey, cfg.EndKey) >= 0 {
return errors.Annotate(berrors.ErrBackupInvalidRange, "endKey must be greater than startKey")
}
cfg.CF, err = flags.GetString(flagTiKVColumnFamily)
if err != nil {
return errors.Trace(err)
}
if err = cfg.Config.ParseFromFlags(flags); err != nil {
return errors.Trace(err)
}
return nil
}
command.Flags().StringP(flagKeyFormat, "", "hex",
"The format of start and end key. Available options: \"raw\", \"escaped\", \"hex\".")

// ParseBackupConfigFromFlags parses the backup-related flags from the flag set.
func (cfg *RawKvConfig) ParseBackupConfigFromFlags(flags *pflag.FlagSet) error {
err := cfg.ParseFromFlags(flags)
if err != nil {
return errors.Trace(err)
}
command.Flags().StringP(flagDstAPIVersion, "", "",
"The encoding method of backuped SST files for destination TiKV cluster, default to the source TiKV cluster. Available options: \"v1\", \"v1ttl\", \"v2\".")

compressionCfg, err := parseCompressionFlags(flags)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionConfig = *compressionCfg
command.Flags().String(flagCompressionType, "zstd",
"The compression algorithm of the backuped SST files. Available options: \"lz4\", \"zstd\", \"snappy\".")

cfg.RemoveSchedulers, err = flags.GetBool(flagRemoveSchedulers)
if err != nil {
return errors.Trace(err)
}
level, err := flags.GetInt32(flagCompressionLevel)
if err != nil {
return errors.Trace(err)
}
cfg.CompressionLevel = level
command.Flags().Bool(flagRemoveSchedulers, false,
"disable the balance, shuffle and region-merge schedulers in PD to speed up backup.")

return nil
// This flag can impact the online cluster, so hide it in case of abuse.
_ = command.Flags().MarkHidden(flagRemoveSchedulers)
}

// RunBackupRaw starts a backup task inside the current goroutine.
Expand Down Expand Up @@ -204,7 +138,8 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
RateLimit: cfg.RateLimit,
Concurrency: cfg.Concurrency,
IsRawKv: true,
Cf: cfg.CF,
Cf: "default",
DstApiVersion: kvrpcpb.APIVersion(kvrpcpb.APIVersion_value[cfg.DstAPIVersion]),
CompressionType: cfg.CompressionType,
CompressionLevel: cfg.CompressionLevel,
CipherInfo: &cfg.CipherInfo,
Expand All @@ -217,7 +152,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
// Backup has finished
updateCh.Close()
rawRanges := []*backuppb.RawRange{{StartKey: backupRange.StartKey, EndKey: backupRange.EndKey, Cf: cfg.CF}}
rawRanges := []*backuppb.RawRange{{StartKey: backupRange.StartKey, EndKey: backupRange.EndKey, Cf: "default"}}
metaWriter.Update(func(m *backuppb.BackupMeta) {
m.StartVersion = req.StartVersion
m.EndVersion = req.EndVersion
Expand Down
Loading