Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare_snap: establish connection to all stores before pausing admin (#51449) #51730

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
shard_count = 9,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
56 changes: 42 additions & 14 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
RetryBackoff time.Duration
RetryLimit int
LeaseDuration time.Duration

/* Observers. Initialize them before starting.*/
AfterConnectionsEstablished func()
}

func New(env Env) *Preparer {
Expand Down Expand Up @@ -155,10 +158,13 @@
zap.Int("retry_limit", p.RetryLimit),
zap.Duration("lease_duration", p.LeaseDuration))
p.retryTime = 0
if err := p.prepareConnections(ctx); err != nil {
if err := p.PrepareConnections(ctx); err != nil {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
if p.AfterConnectionsEstablished != nil {
p.AfterConnectionsEstablished()
}
if err := p.AdvanceState(ctx); err != nil {
log.Error("failed to check the progress of our work", logutil.ShortError(err))
return errors.Annotate(err, "failed to begin step")
Expand Down Expand Up @@ -385,23 +391,31 @@
}

func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) {
s, ok := p.clients[storeID]
_, ok := p.clients[storeID]
if !ok {
log.Warn("stream of store found a store not established connection", zap.Uint64("store", storeID))
cli, err := p.env.ConnectToStore(ctx, storeID)
if err != nil {
return nil, errors.Annotatef(err, "failed to dial store %d", storeID)
}
s = new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err = s.InitConn(ctx, cli)
if err != nil {
return nil, err
if err := p.createAndCacheStream(ctx, cli, storeID); err != nil {

Check warning on line 401 in br/pkg/backup/prepare_snap/prepare.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/prepare_snap/prepare.go#L401

Added line #L401 was not covered by tests
return nil, errors.Annotatef(err, "failed to create and cache stream for store %d", storeID)
}
p.clients[storeID] = s
}
return s, nil
return p.clients[storeID], nil
}

func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error {
s := new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err := s.InitConn(ctx, cli)
if err != nil {
return err
}
p.clients[storeID] = s

Check warning on line 417 in br/pkg/backup/prepare_snap/prepare.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/prepare_snap/prepare.go#L416-L417

Added lines #L416 - L417 were not covered by tests
return nil
}

func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
Expand All @@ -414,17 +428,31 @@
p.inflightReqs[region.GetMeta().Id] = *region.GetMeta()
}

func (p *Preparer) prepareConnections(ctx context.Context) error {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
return errors.Annotate(err, "failed to get all live stores")
}

Check warning on line 439 in br/pkg/backup/prepare_snap/prepare.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/prepare_snap/prepare.go#L439

Added line #L439 was not covered by tests
log.Info("Start to initialize the connections.", zap.Int("stores", len(stores)))
clients := map[uint64]PrepareClient{}
for _, store := range stores {
_, err := p.streamOf(ctx, store.Id)
cli, err := p.env.ConnectToStore(ctx, store.Id)
if err != nil {
return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id)
return errors.Annotatef(err, "failed to dial the store %d", store.Id)
}
clients[store.Id] = cli

Check warning on line 447 in br/pkg/backup/prepare_snap/prepare.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/prepare_snap/prepare.go#L447

Added line #L447 was not covered by tests
}

for id, cli := range clients {
log.Info("Start to pause the admin commands.", zap.Uint64("store", id))
if err := p.createAndCacheStream(ctx, cli, id); err != nil {
return errors.Annotatef(err, "failed to create and cache stream for store %d", id)
}
}

Check warning on line 455 in br/pkg/backup/prepare_snap/prepare.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/prepare_snap/prepare.go#L454-L455

Added lines #L454 - L455 were not covered by tests

return nil
}
90 changes: 86 additions & 4 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -110,15 +111,24 @@ type mockStores struct {
mu sync.Mutex
stores map[uint64]*mockStore
onCreateStore func(*mockStore)
connectDelay func(uint64) <-chan struct{}
onConnectToStore func(uint64) error

pdc *tikv.RegionCache
}

func newTestEnv(pdc pd.Client) *mockStores {
r := tikv.NewRegionCache(pdc)
stores, err := pdc.GetAllStores(context.Background())
if err != nil {
panic(err)
}
ss := map[uint64]*mockStore{}
for _, store := range stores {
ss[store.Id] = nil
}
ms := &mockStores{
stores: map[uint64]*mockStore{},
stores: ss,
pdc: r,
onCreateStore: func(ms *mockStore) {},
}
Expand All @@ -138,7 +148,14 @@ func (m *mockStores) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, err

func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
m.mu.Lock()
defer m.mu.Unlock()
defer func() {
m.mu.Unlock()
if m.connectDelay != nil {
if ch := m.connectDelay(storeID); ch != nil {
<-ch
}
}
}()

if m.onConnectToStore != nil {
err := m.onConnectToStore(storeID)
Expand All @@ -147,8 +164,8 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
}

_, ok := m.stores[storeID]
if !ok {
s, ok := m.stores[storeID]
if !ok || s == nil {
m.stores[storeID] = &mockStore{
output: make(chan brpb.PrepareSnapshotBackupResponse, 16),
successRegions: []metapb.Region{},
Expand Down Expand Up @@ -456,3 +473,68 @@ func TestSplitEnv(t *testing.T) {
require.Equal(t, cc.PrepareClient.(*counterClient).send, 1)
require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions)
}

func TestConnectionDelay(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
called := 0
delayConn := make(chan struct{})
blocked := make(chan struct{}, 64)
ms.connectDelay = func(i uint64) <-chan struct{} {
called += 1
if called == 2 {
blocked <- struct{}{}
return delayConn
}
return nil
}
ctx := context.Background()
prep := New(ms)
connectionPrepareResult := make(chan error)
go func() {
connectionPrepareResult <- prep.PrepareConnections(ctx)
}()
<-blocked
ms.mu.Lock()
nonNilStore := 0
for id, store := range ms.stores {
// We must not create and lease (i.e. reject admin command from any tikv) here.
if store != nil {
req.True(store.leaseUntil.Before(time.Now()), "%d->%s", id, store.leaseUntil)
nonNilStore += 1
}
}
req.GreaterOrEqual(nonNilStore, 2)
ms.mu.Unlock()
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}

func TestHooks(t *testing.T) {
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
pauseWaitApply := make(chan struct{})
ms := newTestEnv(pdc)
ms.onCreateStore = func(ms *mockStore) {
ms.onWaitApply = func(r *metapb.Region) error {
<-pauseWaitApply
return nil
}
}
adv := New(ms)
connectionsEstablished := new(atomic.Bool)
adv.AfterConnectionsEstablished = func() {
connectionsEstablished.Store(true)
}
errCh := make(chan error, 1)
go func() {
errCh <- adv.DriveLoopAndWaitPrepare(context.Background())
}()
req.Eventually(connectionsEstablished.Load, 1*time.Second, 100*time.Millisecond)
close(pauseWaitApply)
req.NoError(<-errCh)
ms.AssertSafeForBackup(t)
req.NoError(adv.Finalize(context.Background()))
ms.AssertIsNormalMode(t)
}
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error {
p.cli = cli
p.clientLoopHandle, ctx = errgroup.WithContext(ctx)
ctx, p.stopBgTasks = context.WithCancel(ctx)
log.Info("initializing", zap.Uint64("store", p.storeID))
return p.GoLeaseLoop(ctx, p.leaseDuration)
}

Expand Down
16 changes: 13 additions & 3 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,15 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
}
defer cx.Close()

initChan := make(chan struct{})
cx.run(func() error { return pauseGCKeeper(cx) })
cx.run(func() error { return pauseSchedulerKeeper(cx) })
cx.run(func() error { return pauseAdminAndWaitApply(cx) })
cx.run(func() error {
log.Info("Pause scheduler waiting all connections established.")
<-initChan
log.Info("Pause scheduler noticed connections established.")
return pauseSchedulerKeeper(cx)
})
cx.run(func() error { return pauseAdminAndWaitApply(cx, initChan) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
Expand All @@ -154,7 +160,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
return eg.Wait()
}

func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext, afterConnectionsEstablished chan<- struct{}) error {
env := preparesnap.CliEnv{
Cache: tikv.NewRegionCache(cx.pdMgr.GetPDClient()),
Mgr: cx.kvMgr,
Expand All @@ -164,6 +170,10 @@ func pauseAdminAndWaitApply(cx *AdaptEnvForSnapshotBackupContext) error {
begin := time.Now()
prep := preparesnap.New(retryEnv)
prep.LeaseDuration = cx.cfg.TTL
prep.AfterConnectionsEstablished = func() {
log.Info("All connections are stablished.")
close(afterConnectionsEstablished)
}

defer cx.cleanUpWith(func(ctx context.Context) {
if err := prep.Finalize(ctx); err != nil {
Expand Down
Loading