Skip to content

Commit

Permalink
operator: pause scheduler after all connections established (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
YuJuncen committed May 6, 2024
1 parent 0954ee0 commit f2db32c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 8,
shard_count = 9,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type Preparer struct {
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -159,6 +162,9 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down
30 changes: 29 additions & 1 deletion br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -474,7 +475,6 @@ func TestSplitEnv(t *testing.T) {
}

func TestConnectionDelay(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
Expand Down Expand Up @@ -510,3 +510,31 @@ func TestConnectionDelay(t *testing.T) {
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
16 changes: 13 additions & 3 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
}
defer cx.Close()

initChan := make(chan struct{})
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error { return pauseSchedulerKeeper(cx) })
cx.run(func() error { return pauseAdminAndWaitApply(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
Expand All @@ -154,7 +160,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
return eg.Wait()
}

func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error {
env := preparesnap.CliEnv{
Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()),
Mgr: cx.kvMgr,
Expand All @@ -164,6 +170,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
begin := time.Now()
prep := preparesnap.New(retryEnv)
prep.LeaseDuration = cx.cfg.TTL
prep.AfterConnectionsEstablished = func() {
log.Info("All connections are stablished.")
close(afterConnectionsEstablished)
}

defer cx.cleanUpWith(func(ctx context.Context) {
if err := prep.Finalize(ctx); err != nil {
Expand Down

0 comments on commit f2db32c

Please sign in to comment.