Skip to content

Commit

Permalink
Merge branch 'master' into support_param_1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Sep 27, 2022
2 parents 388c5f4 + a4a58b8 commit 42a151f
Show file tree
Hide file tree
Showing 107 changed files with 11,853 additions and 9,316 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4453,8 +4453,8 @@ def go_deps():
name = "org_uber_go_goleak",
build_file_proto_mode = "disable_global",
importpath = "go.uber.org/goleak",
sum = "h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=",
version = "v1.1.12",
sum = "h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=",
version = "v1.2.0",
)
go_repository(
name = "org_uber_go_multierr",
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ go_test(
flaky = True,
deps = [
":config",
"//br/pkg/lightning/common",
"//parser/mysql",
"@com_github_burntsushi_toml//:toml",
"@com_github_stretchr_testify//require",
Expand Down
13 changes: 10 additions & 3 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const (
var (
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs", "gs"}

DefaultFilter = []string{
defaultFilter = []string{
"*.*",
"!mysql.*",
"!sys.*",
Expand All @@ -109,6 +109,13 @@ var (
}
)

// GetDefaultFilter gets the default table filter used in Lightning.
// It clones the original default filter,
// so that the original value won't be changed when the returned slice's element is changed.
func GetDefaultFilter() []string {
return append([]string{}, defaultFilter...)
}

type DBStore struct {
Host string `toml:"host" json:"host"`
Port int `toml:"port" json:"port"`
Expand Down Expand Up @@ -715,7 +722,7 @@ func NewConfig() *Config {
},
StrictFormat: false,
MaxRegionSize: MaxRegionSize,
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
DataCharacterSet: defaultCSVDataCharacterSet,
DataInvalidCharReplace: string(defaultCSVDataInvalidCharReplace),
},
Expand Down Expand Up @@ -890,7 +897,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
// mydumper.filter and black-white-list cannot co-exist.
if cfg.HasLegacyBlackWhiteList() {
log.L().Warn("the config `black-white-list` has been deprecated, please replace with `mydumper.filter`")
if !common.StringSliceEqual(cfg.Mydumper.Filter, DefaultFilter) {
if !common.StringSliceEqual(cfg.Mydumper.Filter, defaultFilter) {
return common.ErrInvalidConfig.GenWithStack("`mydumper.filter` and `black-white-list` cannot be simultaneously defined")
}
}
Expand Down
43 changes: 41 additions & 2 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -750,7 +751,7 @@ func TestCronEncodeDecode(t *testing.T) {
func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
require.Equal(t, config.DefaultFilter, cfg.Mydumper.Filter)
require.Equal(t, config.GetDefaultFilter(), cfg.Mydumper.Filter)
require.False(t, cfg.HasLegacyBlackWhiteList())

ctx := context.Background()
Expand All @@ -762,7 +763,7 @@ func TestAdjustWithLegacyBlackWhiteList(t *testing.T) {
cfg.BWList.DoDBs = []string{"test"}
require.EqualError(t, cfg.Adjust(ctx), "[Lightning:Config:ErrInvalidConfig]`mydumper.filter` and `black-white-list` cannot be simultaneously defined")

cfg.Mydumper.Filter = config.DefaultFilter
cfg.Mydumper.Filter = config.GetDefaultFilter()
require.NoError(t, cfg.Adjust(ctx))
require.True(t, cfg.HasLegacyBlackWhiteList())
}
Expand Down Expand Up @@ -955,3 +956,41 @@ func TestCheckAndAdjustForLocalBackend(t *testing.T) {
cfg.TikvImporter.SortedKVDir = base
require.NoError(t, cfg.CheckAndAdjustForLocalBackend())
}

func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
originalDefaultCfg := append([]string{}, config.GetDefaultFilter()...)
cfg1 := config.NewConfig()
require.NoError(t, cfg1.LoadFromTOML([]byte(`
[mydumper]
filter = ["db1.tbl1", "db2.*", "!db2.tbl1"]
`)))
require.Equal(t, 3, len(cfg1.Mydumper.Filter))
require.True(t, common.StringSliceEqual(
cfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

cfg2 := config.NewConfig()
require.True(t, common.StringSliceEqual(
cfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg1, err := config.LoadGlobalConfig([]string{"-f", "db1.tbl1", "-f", "db2.*", "-f", "!db2.tbl1"}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg1.Mydumper.Filter,
[]string{"db1.tbl1", "db2.*", "!db2.tbl1"},
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))

gCfg2, err := config.LoadGlobalConfig([]string{}, nil)
require.NoError(t, err)
require.True(t, common.StringSliceEqual(
gCfg2.Mydumper.Filter,
originalDefaultCfg,
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewGlobalConfig() *GlobalConfig {
LogLevel: "error",
},
Mydumper: GlobalMydumper{
Filter: DefaultFilter,
Filter: GetDefaultFilter(),
},
TikvImporter: GlobalImporter{
Backend: "",
Expand Down
17 changes: 0 additions & 17 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/streamhelper/config"
"github.com/pingcap/tidb/br/pkg/utils"
Expand Down Expand Up @@ -461,22 +460,6 @@ func (c *CheckpointAdvancer) advanceCheckpointBy(ctx context.Context, getCheckpo
return nil
}

// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer func() {
e := recover()
if e != nil {
log.Error("panic during handing tick", zap.Stack("stack"), logutil.ShortError(err))
err = errors.Annotatef(berrors.ErrUnknown, "panic during handling tick: %s", e)
}
}()
err = c.tick(ctx)
return
}

func (c *CheckpointAdvancer) onConsistencyCheckTick(s *updateSmallTree) error {
if s.consistencyCheckTick > 0 {
s.consistencyCheckTick--
Expand Down
82 changes: 25 additions & 57 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,80 +4,48 @@ package streamhelper

import (
"context"
"time"

"github.com/google/uuid"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

const (
ownerPrompt = "log-backup"
ownerPath = "/tidb/br-stream/owner"
)

// AdvancerDaemon is a "high-availability" version of advancer.
// It involved the manager for electing a owner and doing things.
// You can embed it into your code by simply call:
//
// ad := NewAdvancerDaemon(adv, mgr)
// loop, err := ad.Begin(ctx)
//
// if err != nil {
// return err
// }
//
// loop()
type AdvancerDaemon struct {
adv *CheckpointAdvancer
manager owner.Manager
// OnTick advances the inner logic clock for the advancer.
// It's synchronous: this would only return after the events triggered by the clock has all been done.
// It's generally panic-free, you may not need to trying recover a panic here.
func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
defer c.recordTimeCost("tick")()
defer utils.PanicToErr(&err)
return c.tick(ctx)
}

func NewAdvancerDaemon(adv *CheckpointAdvancer, manager owner.Manager) *AdvancerDaemon {
return &AdvancerDaemon{
adv: adv,
manager: manager,
}
// OnStart implements daemon.Interface.
func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.StartTaskListener(ctx)
go func() {
<-ctx.Done()
c.onStop()
}()
}

func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
// Name implements daemon.Interface.
func (c *CheckpointAdvancer) Name() string {
return "LogBackup::Advancer"
}

// Begin starts the daemon.
// It would do some bootstrap task, and return a closure that would begin the main loop.
func (ad *AdvancerDaemon) Begin(ctx context.Context) (func(), error) {
log.Info("begin advancer daemon", zap.String("id", ad.manager.ID()))
if err := ad.manager.CampaignOwner(); err != nil {
return nil, err
}
func (c *CheckpointAdvancer) onStop() {
metrics.AdvancerOwner.Set(0.0)
}

ad.adv.StartTaskListener(ctx)
tick := time.NewTicker(ad.adv.cfg.TickDuration)
loop := func() {
log.Info("begin advancer daemon loop", zap.String("id", ad.manager.ID()))
for {
select {
case <-ctx.Done():
log.Info("advancer loop exits", zap.String("id", ad.manager.ID()))
return
case <-tick.C:
log.Debug("deamon tick start", zap.Bool("is-owner", ad.manager.IsOwner()))
if ad.manager.IsOwner() {
metrics.AdvancerOwner.Set(1.0)
if err := ad.adv.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
} else {
metrics.AdvancerOwner.Set(0.0)
}
}
}
}
return loop, nil
func OwnerManagerForLogBackup(ctx context.Context, etcdCli *clientv3.Client) owner.Manager {
id := uuid.New()
return owner.NewOwnerManager(ctx, etcdCli, ownerPrompt, id.String(), ownerPath)
}
29 changes: 29 additions & 0 deletions br/pkg/streamhelper/daemon/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "daemon",
srcs = [
"interface.go",
"owner_daemon.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/streamhelper/daemon",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//owner",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "daemon_test",
srcs = ["owner_daemon_test.go"],
flaky = True,
deps = [
":daemon",
"//owner",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
],
)
17 changes: 17 additions & 0 deletions br/pkg/streamhelper/daemon/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package daemon

import "context"

// Interface describes the lifetime hook of a daemon application.
type Interface interface {
// OnStart would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
OnStart(ctx context.Context)
// OnTick would be called periodically.
// The error can be recorded.
OnTick(ctx context.Context) error
// Name returns the name which is used for tracing the daemon.
Name() string
}
Loading

0 comments on commit 42a151f

Please sign in to comment.