Skip to content

Commit

Permalink
log-backup: fix the issue that gc-worker owner cannot check log-backu…
Browse files Browse the repository at this point in the history
…p task existed. (#41824) (#41848)

close #41806
  • Loading branch information
ti-chi-bot authored Mar 2, 2023
1 parent 8b48e91 commit ee46b2b
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
8 changes: 6 additions & 2 deletions br/pkg/streamhelper/advancer_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ func (c *CheckpointAdvancer) OnTick(ctx context.Context) (err error) {
return c.tick(ctx)
}

// OnStart implements daemon.Interface.
// OnStart implements daemon.Interface, which will be called when log backup service starts.
func (c *CheckpointAdvancer) OnStart(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.StartTaskListener(ctx)
}

// OnBecomeOwner implements daemon.Interface. If the tidb-server become owner, this function will be called.
func (c *CheckpointAdvancer) OnBecomeOwner(ctx context.Context) {
metrics.AdvancerOwner.Set(1.0)
c.spawnSubscriptionHandler(ctx)
go func() {
<-ctx.Done()
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/streamhelper/daemon/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import "context"

// Interface describes the lifetime hook of a daemon application.
type Interface interface {
// OnStart would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
// OnStart start the service whatever the tidb-server is owner or not.
OnStart(ctx context.Context)
// OnBecomeOwner would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
OnBecomeOwner(ctx context.Context)
// OnTick would be called periodically.
// The error can be recorded.
OnTick(ctx context.Context) error
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) {
od.cancel = cancel
log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
// Note: maybe save the context so we can cancel the tick when we are not owner?
od.daemon.OnStart(cx)
od.daemon.OnBecomeOwner(cx)
}

// Tick anyway.
Expand All @@ -72,6 +72,10 @@ func (od *OwnerDaemon) Begin(ctx context.Context) (func(), error) {
return nil, err
}

// start the service.
od.daemon.OnStart(ctx)

// tick starts.
tick := time.NewTicker(od.tickInterval)
loop := func() {
log.Info("begin running daemon", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
Expand Down
18 changes: 15 additions & 3 deletions br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (

type anApp struct {
sync.Mutex
begun bool
serviceStart bool
begun bool

tickingMessenger chan struct{}
tickingMessengerOnce *sync.Once
Expand All @@ -33,9 +34,14 @@ func newTestApp(t *testing.T) *anApp {
}
}

// OnStart would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
// OnStart implements daemon.Interface.
func (a *anApp) OnStart(ctx context.Context) {
a.serviceStart = true
}

// OOnBecomeOwner would be called once become the owner.
// The context passed in would be canceled once it is no more the owner.
func (a *anApp) OnBecomeOwner(ctx context.Context) {
a.Lock()
defer a.Unlock()
if a.begun {
Expand Down Expand Up @@ -87,6 +93,10 @@ func (a *anApp) Running() bool {
return a.begun
}

func (a *anApp) AssertService(req *require.Assertions, serviceStart bool) {
req.True(a.serviceStart == serviceStart)
}

func (a *anApp) AssertTick(timeout time.Duration) {
a.Lock()
messenger := a.tickingMessenger
Expand Down Expand Up @@ -129,8 +139,10 @@ func TestDaemon(t *testing.T) {
ow := owner.NewMockManager(ctx, "owner_daemon_test")
d := daemon.New(app, ow, 100*time.Millisecond)

app.AssertService(req, false)
f, err := d.Begin(ctx)
req.NoError(err)
app.AssertService(req, true)
go f()
app.AssertStart(1 * time.Second)
app.AssertTick(1 * time.Second)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_library(
"@org_golang_google_grpc//status",
"@org_golang_x_net//http/httpproxy",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
19 changes: 8 additions & 11 deletions br/pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"context"
"database/sql"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand All @@ -25,8 +25,7 @@ var (
_ DBExecutor = &sql.DB{}
_ DBExecutor = &sql.Conn{}

LogBackupTaskMutex sync.Mutex
logBackupTaskCount int
logBackupTaskCount = atomic.NewInt32(0)
)

// QueryExecutor is a interface for exec query
Expand Down Expand Up @@ -134,26 +133,24 @@ func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error {

// LogBackupTaskCountInc increases the count of log backup task.
func LogBackupTaskCountInc() {
LogBackupTaskMutex.Lock()
logBackupTaskCount++
LogBackupTaskMutex.Unlock()
logBackupTaskCount.Inc()
log.Info("inc log backup task", zap.Int32("count", logBackupTaskCount.Load()))
}

// LogBackupTaskCountDec decreases the count of log backup task.
func LogBackupTaskCountDec() {
LogBackupTaskMutex.Lock()
logBackupTaskCount--
LogBackupTaskMutex.Unlock()
logBackupTaskCount.Dec()
log.Info("dec log backup task", zap.Int32("count", logBackupTaskCount.Load()))
}

// CheckLogBackupTaskExist checks that whether log-backup is existed.
func CheckLogBackupTaskExist() bool {
return logBackupTaskCount > 0
return logBackupTaskCount.Load() > 0
}

// IsLogBackupInUse checks the log backup task existed.
func IsLogBackupInUse(ctx sessionctx.Context) bool {
return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist()
return CheckLogBackupTaskExist()
}

// GetTidbNewCollationEnabled returns the variable name of NewCollationEnabled.
Expand Down
11 changes: 10 additions & 1 deletion br/tests/br_restore_log_task_enable/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ TABLE="usertable"

# start log task
run_br log start --task-name 1234 -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR
if ! grep -i "inc log backup task" "$TEST_DIR/tidb.log"; then
echo "TEST: [$TEST_NAME] log start failed!"
exit 1
fi

run_sql "CREATE DATABASE $DB;"
run_sql "CREATE TABLE $DB.$TABLE (id int);"
Expand Down Expand Up @@ -47,7 +51,12 @@ run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR && exit 1
run_br restore point -s "local://$TEST_DIR/$DB/log" --pd $PD_ADDR && exit 1

# stop log task
run_br log stop --task-name 1234 --pd $PD_ADDR
unset BR_LOG_TO_TERM
run_br log stop --task-name 1234 --pd $PD_ADDR
if ! grep -i "dec log backup task" "$TEST_DIR/tidb.log"; then
echo "TEST: [$TEST_NAME] log stop failed!"
exit 1
fi

# restore full (should be success)
run_br restore full -s "local://$TEST_DIR/$DB/full" --pd $PD_ADDR
Expand Down

0 comments on commit ee46b2b

Please sign in to comment.