From efe553385b4392007070844da6438feb7fdc3c50 Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 2 Mar 2023 12:23:09 +0800 Subject: [PATCH] log-backup: fix the issue that gc-worker owner cannot check log-backup task existed. (#41824) close pingcap/tidb#41806 --- br/pkg/streamhelper/advancer_daemon.go | 8 ++++++-- br/pkg/streamhelper/daemon/interface.go | 6 ++++-- br/pkg/streamhelper/daemon/owner_daemon.go | 6 +++++- .../streamhelper/daemon/owner_daemon_test.go | 18 +++++++++++++++--- br/pkg/utils/BUILD.bazel | 1 + br/pkg/utils/db.go | 19 ++++++++----------- br/tests/br_restore_log_task_enable/run.sh | 11 ++++++++++- 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/br/pkg/streamhelper/advancer_daemon.go b/br/pkg/streamhelper/advancer_daemon.go index 10f43e105ccbe..4e3b68eb3fbf5 100644 --- a/br/pkg/streamhelper/advancer_daemon.go +++ b/br/pkg/streamhelper/advancer_daemon.go @@ -26,10 +26,14 @@ func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) { return c.tick(ctx) } -// OnStart implements daemon.Interface. +// OnStart implements daemon.Interface, which will be called when log backup service starts. func (c *CheckpointAdvancer) OnStart(ctx context.Context) { - metrics.AdvancerOwner.Set(1.0) c.StartTaskListener(ctx) +} + +// OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called. +func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context) { + metrics.AdvancerOwner.Set(1.0) c.spawnSubscriptionHandler(ctx) go func() { <-ctx.Done() diff --git a/br/pkg/streamhelper/daemon/interface.go b/br/pkg/streamhelper/daemon/interface.go index 544d67b153a0a..9f651d9488fb6 100644 --- a/br/pkg/streamhelper/daemon/interface.go +++ b/br/pkg/streamhelper/daemon/interface.go @@ -6,9 +6,11 @@ import "context" // Interface describes the lifetime hook of a daemon application. type Interface interface { - // OnStart would be called once become the owner. - // The context passed in would be canceled once it is no more the owner. + // OnStart start the service whatever the tidb-server is owner or not. OnStart(ctx context.Context) + // OnBecomeOwner would be called once become the owner. + // The context passed in would be canceled once it is no more the owner. + OnBecomeOwner(ctx context.Context) // OnTick would be called periodically. // The error can be recorded. OnTick(ctx context.Context) error diff --git a/br/pkg/streamhelper/daemon/owner_daemon.go b/br/pkg/streamhelper/daemon/owner_daemon.go index 3f14315957c43..533e38b7296c1 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon.go +++ b/br/pkg/streamhelper/daemon/owner_daemon.go @@ -55,7 +55,7 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) { od.cancel = cancel log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name())) // Note: maybe save the context so we can cancel the tick when we are not owner? - od.daemon.OnStart(cx) + od.daemon.OnBecomeOwner(cx) } // Tick anyway. @@ -72,6 +72,10 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) { return nil, err } + // start the service. + od.daemon.OnStart(ctx) + + // tick starts. tick := time.NewTicker(od.tickInterval) loop := func() { log.Info("begin running daemon", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name())) diff --git a/br/pkg/streamhelper/daemon/owner_daemon_test.go b/br/pkg/streamhelper/daemon/owner_daemon_test.go index 74251d0b410a1..7ae6ebf38e59e 100644 --- a/br/pkg/streamhelper/daemon/owner_daemon_test.go +++ b/br/pkg/streamhelper/daemon/owner_daemon_test.go @@ -16,7 +16,8 @@ import ( type anApp struct { sync.Mutex - begun bool + serviceStart bool + begun bool tickingMessenger chan struct{} tickingMessengerOnce *sync.Once @@ -33,9 +34,14 @@ func newTestApp(t *testing.T) *anApp { } } -// OnStart would be called once become the owner. -// The context passed in would be canceled once it is no more the owner. +// OnStart implements daemon.Interface. func (a *anApp) OnStart(ctx context.Context) { + a.serviceStart = true +} + +// OOnBecomeOwner would be called once become the owner. +// The context passed in would be canceled once it is no more the owner. +func (a *anApp) OnBecomeOwner(ctx context.Context) { a.Lock() defer a.Unlock() if a.begun { @@ -87,6 +93,10 @@ func (a *anApp) Running() bool { return a.begun } +func (a *anApp) AssertService(req *require.Assertions, serviceStart bool) { + req.True(a.serviceStart == serviceStart) +} + func (a *anApp) AssertTick(timeout time.Duration) { a.Lock() messenger := a.tickingMessenger @@ -129,8 +139,10 @@ func TestDaemon(t *testing.T) { ow := owner.NewMockManager(ctx, "owner_daemon_test") d := daemon.New(app, ow, 100*time.Millisecond) + app.AssertService(req, false) f, err := d.Begin(ctx) req.NoError(err) + app.AssertService(req, true) go f() app.AssertStart(1 * time.Second) app.AssertTick(1 * time.Second) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index a453370d25c46..806fc394b761a 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "@org_golang_google_grpc//status", "@org_golang_x_net//http/httpproxy", "@org_golang_x_sync//errgroup", + "@org_uber_go_atomic//:atomic", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 060df603d16cb..92e9b8e9a4303 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -7,7 +7,6 @@ import ( "database/sql" "strconv" "strings" - "sync" "github.com/docker/go-units" "github.com/pingcap/errors" @@ -16,6 +15,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -28,8 +28,7 @@ var ( _ DBExecutor = &sql.DB{} _ DBExecutor = &sql.Conn{} - LogBackupTaskMutex sync.Mutex - logBackupTaskCount int + logBackupTaskCount = atomic.NewInt32(0) ) // QueryExecutor is a interface for exec query @@ -203,26 +202,24 @@ func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error { // LogBackupTaskCountInc increases the count of log backup task. func LogBackupTaskCountInc() { - LogBackupTaskMutex.Lock() - logBackupTaskCount++ - LogBackupTaskMutex.Unlock() + logBackupTaskCount.Inc() + log.Info("inc log backup task", zap.Int32("count", logBackupTaskCount.Load())) } // LogBackupTaskCountDec decreases the count of log backup task. func LogBackupTaskCountDec() { - LogBackupTaskMutex.Lock() - logBackupTaskCount-- - LogBackupTaskMutex.Unlock() + logBackupTaskCount.Dec() + log.Info("dec log backup task", zap.Int32("count", logBackupTaskCount.Load())) } // CheckLogBackupTaskExist checks that whether log-backup is existed. func CheckLogBackupTaskExist() bool { - return logBackupTaskCount > 0 + return logBackupTaskCount.Load() > 0 } // IsLogBackupInUse checks the log backup task existed. func IsLogBackupInUse(ctx sessionctx.Context) bool { - return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist() + return CheckLogBackupTaskExist() } // GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled. diff --git a/br/tests/br_restore_log_task_enable/run.sh b/br/tests/br_restore_log_task_enable/run.sh index 923f8fe7c2b33..5525123f74b54 100644 --- a/br/tests/br_restore_log_task_enable/run.sh +++ b/br/tests/br_restore_log_task_enable/run.sh @@ -20,6 +20,10 @@ TABLE="usertable" # start log task run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR +if ! grep -i "inc log backup task" "$TEST_DIR/tidb.log"; then + echo "TEST: [$TEST_NAME] log start failed!" + exit 1 +fi run_sql "CREATE DATABASE $DB;" run_sql "CREATE TABLE $DB.$TABLE (id int);" @@ -47,7 +51,12 @@ run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1 run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1 # stop log task -run_br log stop --task-name 1234 --pd $PD_ADDR +unset BR_LOG_TO_TERM +run_br log stop --task-name 1234 --pd $PD_ADDR +if ! grep -i "dec log backup task" "$TEST_DIR/tidb.log"; then + echo "TEST: [$TEST_NAME] log stop failed!" + exit 1 +fi # restore full (should be success) run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR