Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightning: add retry if transaction failed while fetching task metas #53041

Open
wants to merge 7 commits into
base: release-7.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .dockerignore

This file was deleted.

3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
[submodule "extension/enterprise"]
path = pkg/extension/enterprise
url = git@github.com:pingcap-inc/enterprise-extensions.git
26 changes: 2 additions & 24 deletions br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,17 @@ package main

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func main() {
gCtx := context.Background()
ctx, cancel := context.WithCancel(gCtx)
defer cancel()

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

go func() {
sig := <-sc
fmt.Printf("\nGot signal [%v] to exit.\n", sig)
log.Warn("received signal to exit", zap.Stringer("signal", sig))
cancel()
fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit")
<-sc
// Even user use SIGTERM to exit, there isn't any checkpoint for resuming,
// hence returning fail exit code.
os.Exit(1)
}()
ctx, cancel := utils.StartExitSingleListener(gCtx)

rootCmd := &cobra.Command{
Use: "br",
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/prepare_snap/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ go_test(
timeout = "short",
srcs = ["prepare_test.go"],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
":prepare_snap",
"//br/pkg/utils",
Expand Down
31 changes: 30 additions & 1 deletion br/pkg/backup/prepare_snap/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package preparesnap
import (
"context"
"slices"
"sync"
"time"

"github.com/docker/go-units"
Expand Down Expand Up @@ -110,6 +111,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) {
return withoutTiFlash, err
}

func AdaptForGRPCInTest(p PrepareClient) PrepareClient {
return &gRPCGoAdapter{
inner: p,
}
}

// GrpcGoAdapter makes the `Send` call synchronous.
// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe.
// But concurrency call to `send` and `recv` is safe.
// This type is exported for testing.
type gRPCGoAdapter struct {
inner PrepareClient
sendMu sync.Mutex
recvMu sync.Mutex
}

func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error {
s.sendMu.Lock()
defer s.sendMu.Unlock()
return s.inner.Send(req)
}

func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) {
s.recvMu.Lock()
defer s.recvMu.Unlock()
return s.inner.Recv()
}

func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
var cli brpb.Backup_PrepareSnapshotBackupClient
err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error {
Expand All @@ -124,7 +153,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie
if err != nil {
return nil, err
}
return cli, nil
return &gRPCGoAdapter{inner: cli}, nil
}

func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down
53 changes: 38 additions & 15 deletions br/pkg/backup/prepare_snap/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (p *Preparer) DriveLoopAndWaitPrepare(ctx context.Context) error {
zap.Int("retry_limit", p.RetryLimit),
zap.Duration("lease_duration", p.LeaseDuration))
p.retryTime = 0
if err := p.prepareConnections(ctx); err != nil {
if err := p.PrepareConnections(ctx); err != nil {
log.Error("failed to prepare connections", logutil.ShortError(err))
return errors.Annotate(err, "failed to prepare connections")
}
Expand Down Expand Up @@ -370,7 +370,9 @@ func (p *Preparer) workOnPendingRanges(ctx context.Context) error {
}

func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) error {
logutil.CL(ctx).Info("about to send wait apply to stores", zap.Int("to-stores", len(reqs)))
for store, req := range reqs {
logutil.CL(ctx).Info("sending wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions)))
stream, err := p.streamOf(ctx, store)
if err != nil {
return errors.Annotatef(err, "failed to dial the store %d", store)
Expand All @@ -379,29 +381,36 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro
if err != nil {
return errors.Annotatef(err, "failed to send message to the store %d", store)
}
logutil.CL(ctx).Info("sent wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions)))
}
return nil
}

func (p *Preparer) streamOf(ctx context.Context, storeID uint64) (*prepareStream, error) {
s, ok := p.clients[storeID]
_, ok := p.clients[storeID]
if !ok {
log.Warn("stream of store found a store not established connection", zap.Uint64("store", storeID))
cli, err := p.env.ConnectToStore(ctx, storeID)
if err != nil {
return nil, errors.Annotatef(err, "failed to dial store %d", storeID)
}
s = new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err = s.InitConn(ctx, cli)
if err != nil {
return nil, err
if err := p.createAndCacheStream(ctx, cli, storeID); err != nil {
return nil, errors.Annotatef(err, "failed to create and cache stream for store %d", storeID)
}
p.clients[storeID] = s
}
return s, nil
return p.clients[storeID], nil
}

func (p *Preparer) createAndCacheStream(ctx context.Context, cli PrepareClient, storeID uint64) error {
s := new(prepareStream)
s.storeID = storeID
s.output = p.eventChan
s.leaseDuration = p.LeaseDuration
err := s.InitConn(ctx, cli)
if err != nil {
return err
}
p.clients[storeID] = s
return nil
}

func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
Expand All @@ -414,17 +423,31 @@ func (p *Preparer) pushWaitApply(reqs pendingRequests, region Region) {
p.inflightReqs[region.GetMeta().Id] = *region.GetMeta()
}

func (p *Preparer) prepareConnections(ctx context.Context) error {
// PrepareConnections prepares the connections for each store.
// This will pause the admin commands for each store.
func (p *Preparer) PrepareConnections(ctx context.Context) error {
log.Info("Preparing connections to stores.")
stores, err := p.env.GetAllLiveStores(ctx)
if err != nil {
return errors.Annotate(err, "failed to get all live stores")
}

log.Info("Start to initialize the connections.", zap.Int("stores", len(stores)))
clients := map[uint64]PrepareClient{}
for _, store := range stores {
_, err := p.streamOf(ctx, store.Id)
cli, err := p.env.ConnectToStore(ctx, store.Id)
if err != nil {
return errors.Annotatef(err, "failed to prepare connection to store %d", store.Id)
return errors.Annotatef(err, "failed to dial the store %d", store.Id)
}
clients[store.Id] = cli
}

for id, cli := range clients {
log.Info("Start to pause the admin commands.", zap.Uint64("store", id))
if err := p.createAndCacheStream(ctx, cli, id); err != nil {
return errors.Annotatef(err, "failed to create and cache stream for store %d", id)
}
}

return nil
}
64 changes: 59 additions & 5 deletions br/pkg/backup/prepare_snap/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,24 @@ type mockStores struct {
mu sync.Mutex
stores map[uint64]*mockStore
onCreateStore func(*mockStore)
connectDelay func(uint64) <-chan struct{}
onConnectToStore func(uint64) error

pdc *tikv.RegionCache
}

func newTestEnv(pdc pd.Client) *mockStores {
r := tikv.NewRegionCache(pdc)
stores, err := pdc.GetAllStores(context.Background())
if err != nil {
panic(err)
}
ss := map[uint64]*mockStore{}
for _, store := range stores {
ss[store.Id] = nil
}
ms := &mockStores{
stores: map[uint64]*mockStore{},
stores: ss,
pdc: r,
onCreateStore: func(ms *mockStore) {},
}
Expand All @@ -138,7 +147,14 @@ func (m *mockStores) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, err

func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) {
m.mu.Lock()
defer m.mu.Unlock()
defer func() {
m.mu.Unlock()
if m.connectDelay != nil {
if ch := m.connectDelay(storeID); ch != nil {
<-ch
}
}
}()

if m.onConnectToStore != nil {
err := m.onConnectToStore(storeID)
Expand All @@ -147,8 +163,8 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
}

_, ok := m.stores[storeID]
if !ok {
s, ok := m.stores[storeID]
if !ok || s == nil {
m.stores[storeID] = &mockStore{
output: make(chan brpb.PrepareSnapshotBackupResponse, 16),
successRegions: []metapb.Region{},
Expand All @@ -161,7 +177,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar
}
m.onCreateStore(m.stores[storeID])
}
return m.stores[storeID], nil
return AdaptForGRPCInTest(m.stores[storeID]), nil
}

func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {
Expand Down Expand Up @@ -456,3 +472,41 @@ func TestSplitEnv(t *testing.T) {
require.Equal(t, cc.PrepareClient.(*counterClient).send, 1)
require.ElementsMatch(t, cc.PrepareClient.(*counterClient).regions, tinyRequest.Regions)
}

func TestConnectionDelay(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
req := require.New(t)
pdc := fakeCluster(t, 3, dummyRegions(100)...)
ms := newTestEnv(pdc)
called := 0
delayConn := make(chan struct{})
blocked := make(chan struct{}, 64)
ms.connectDelay = func(i uint64) <-chan struct{} {
called += 1
if called == 2 {
blocked <- struct{}{}
return delayConn
}
return nil
}
ctx := context.Background()
prep := New(ms)
connectionPrepareResult := make(chan error)
go func() {
connectionPrepareResult <- prep.PrepareConnections(ctx)
}()
<-blocked
ms.mu.Lock()
nonNilStore := 0
for id, store := range ms.stores {
// We must not create and lease (i.e. reject admin command from any tikv) here.
if store != nil {
req.True(store.leaseUntil.Before(time.Now()), "%d->%s", id, store.leaseUntil)
nonNilStore += 1
}
}
req.GreaterOrEqual(nonNilStore, 2)
ms.mu.Unlock()
delayConn <- struct{}{}
req.NoError(<-connectionPrepareResult)
}
1 change: 1 addition & 0 deletions br/pkg/backup/prepare_snap/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (p *prepareStream) InitConn(ctx context.Context, cli PrepareClient) error {
p.cli = cli
p.clientLoopHandle, ctx = errgroup.WithContext(ctx)
ctx, p.stopBgTasks = context.WithCancel(ctx)
log.Info("initializing", zap.Uint64("store", p.storeID))
return p.GoLeaseLoop(ctx, p.leaseDuration)
}

Expand Down
Loading