Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup: allow backup tolerate minor TiKV failure (#997) (#1019) #1062

Merged
merged 7 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ static: prepare tools
$(PACKAGE_DIRECTORIES)
# pingcap/errors APIs are mixed with multiple patterns 'pkg/errors',
# 'juju/errors' and 'pingcap/parser'. To avoid confusion and mistake,
# we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause".
# we only allow a subset of APIs, that's "Normalize|Annotate|Trace|Cause|Find".
# TODO: check lightning packages.
@# TODO: allow more APIs when we need to support "workaound".
grep -Rn --include="*.go" --exclude="*_test.go" -E "(\t| )errors\.[A-Z]" \
$$($(PACKAGE_DIRECTORIES) | grep -vE "tests|lightning") | \
grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled" 2>&1 | $(CHECKER)
grep -vE "Normalize|Annotate|Trace|Cause|RedactLogEnabled|Find" 2>&1 | $(CHECKER)
# The package name of "github.com/pingcap/kvproto/pkg/backup" collides
# "github.com/pingcap/br/pkg/backup", so we rename kvproto to backuppb.
grep -Rn --include="*.go" -E '"github.com/pingcap/kvproto/pkg/backup"' \
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ error = '''
backup no leader
'''

["BR:Common:ErrFailedToConnect"]
error = '''
failed to make gRPC channels
'''

["BR:Common:ErrInvalidArgument"]
error = '''
invalid argument
Expand Down
59 changes: 54 additions & 5 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/br/pkg/conn"
berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/redact"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/summary"
Expand Down Expand Up @@ -221,7 +222,7 @@ func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *backuppb.Backu
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to find shell to notify, skipping notify", zap.Error(err))
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
Expand Down Expand Up @@ -609,6 +610,21 @@ func (bc *Client) fineGrainedBackup(
rangeTree rtree.RangeTree,
progressCallBack func(ProgressUnit),
) error {
failpoint.Inject("hint-fine-grained-backup", func(v failpoint.Value) {
log.Info("failpoint hint-fine-grained-backup injected, "+
"process will sleep for 3s and notify the shell.", zap.String("file", v.(string)))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
time.Sleep(3 * time.Second)
}
})

bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
for {
// Step1, check whether there is any incomplete range
Expand Down Expand Up @@ -804,8 +820,15 @@ func (bc *Client) handleFineGrained(
lockResolver := bc.mgr.GetLockResolver()
client, err := bc.mgr.GetBackupClient(ctx, storeID)
if err != nil {
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}

log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
return 0, errors.Trace(err)
return 0, errors.Annotatef(err, "failed to connect to store %d", storeID)
}
hasProgress := false
backoffMill := 0
Expand Down Expand Up @@ -835,7 +858,15 @@ func (bc *Client) handleFineGrained(
return bc.mgr.ResetBackupClient(ctx, storeID)
})
if err != nil {
return 0, errors.Trace(err)
if berrors.Is(err, berrors.ErrFailedToConnect) {
// When the leader store is died,
// 20s for the default max duration before the raft election timer fires.
log.Warn("failed to connect to store, skipping", logutil.ShortError(err), zap.Uint64("storeID", storeID))
return 20000, nil
}
log.Error("failed to send fine-grained backup", zap.Uint64("storeID", storeID), logutil.ShortError(err))
return 0, errors.Annotatef(err, "failed to send fine-grained backup [%s, %s)",
redact.Key(req.StartKey), redact.Key(req.EndKey))
}

// If no progress, backoff 10s for debouncing.
Expand Down Expand Up @@ -866,6 +897,20 @@ backupLoop:
zap.Uint64("storeID", storeID),
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
log.Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -891,8 +936,11 @@ backupLoop:
}
log.Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return errors.Trace(err)
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}
// It's strange this can pass errcheck in both release-5.0 and master
// nolint:errcheck
defer bcli.CloseSend()

for {
resp, err := bcli.Recv()
Expand All @@ -913,8 +961,9 @@ backupLoop:
}
break
}
return errors.Annotatef(err, "failed to connect to store: %d with retry times:%d", storeID, retry)
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry)
}

// TODO: handle errors in the resp.
log.Info("range backuped",
logutil.Key("startKey", resp.GetStartKey()),
Expand Down
20 changes: 17 additions & 3 deletions pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/redact"
"github.com/pingcap/br/pkg/rtree"
"github.com/pingcap/br/pkg/utils"
)
Expand Down Expand Up @@ -43,6 +45,11 @@ func (push *pushDown) pushBackup(
) (rtree.RangeTree, error) {
// Push down backup tasks to all tikv instances.
res := rtree.NewRangeTree()
failpoint.Inject("noop-backup", func(_ failpoint.Value) {
log.Warn("skipping normal backup, jump to fine-grained backup, meow :3", logutil.Key("start-key", req.StartKey), logutil.Key("end-key", req.EndKey))
failpoint.Return(res, nil)
})

wg := new(sync.WaitGroup)
for _, s := range stores {
storeID := s.GetId()
Expand All @@ -52,8 +59,10 @@ func (push *pushDown) pushBackup(
}
client, err := push.mgr.GetBackupClient(ctx, storeID)
if err != nil {
log.Error("fail to connect store", zap.Uint64("StoreID", storeID))
return res, errors.Trace(err)
// BR should be able to backup even some of stores disconnected.
// The regions managed by this store can be retried at fine-grained backup then.
log.Warn("fail to connect store, skipping", zap.Uint64("StoreID", storeID), zap.Error(err))
return res, nil
}
wg.Add(1)
go func() {
Expand All @@ -69,6 +78,7 @@ func (push *pushDown) pushBackup(
log.Warn("reset the connection in push", zap.Uint64("storeID", storeID))
return push.mgr.ResetBackupClient(ctx, storeID)
})
// Disconnected stores can be ignored.
if err != nil {
push.errCh <- err
return
Expand Down Expand Up @@ -125,7 +135,11 @@ func (push *pushDown) pushBackup(
}
}
case err := <-push.errCh:
return res, errors.Trace(err)
if !berrors.Is(err, berrors.ErrFailedToConnect) {
return res, errors.Annotatef(err, "failed to backup range [%s, %s)", redact.Key(req.StartKey), redact.Key(req.EndKey))
}
log.Warn("skipping disconnected stores", logutil.ShortError(err))
return res, nil
}
}
}
19 changes: 18 additions & 1 deletion pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package conn
import (
"context"
"crypto/tls"
"os"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -171,6 +173,20 @@ func NewMgr(
}

func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.ClientConn, error) {
failpoint.Inject("hint-get-backup-client", func(v failpoint.Value) {
log.Info("failpoint hint-get-backup-client injected, "+
"process will notify the shell.", zap.Uint64("store", storeID))
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
store, err := mgr.GetPDClient().GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -196,7 +212,7 @@ func (mgr *Mgr) getGrpcConnLocked(ctx context.Context, storeID uint64) (*grpc.Cl
)
cancel()
if err != nil {
return nil, errors.Trace(err)
return nil, berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to make connection to store %d", storeID)
}
return conn, nil
}
Expand All @@ -209,6 +225,7 @@ func (mgr *Mgr) GetBackupClient(ctx context.Context, storeID uint64) (backuppb.B

mgr.grpcClis.mu.Lock()
defer mgr.grpcClis.mu.Unlock()

if conn, ok := mgr.grpcClis.clis[storeID]; ok {
// Find a cached backup client.
return backuppb.NewBackupClient(conn), nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,22 @@ import (
"github.com/pingcap/errors"
)

// Is tests whether the specificated error causes the error `err`.
func Is(err error, is *errors.Error) bool {
errorFound := errors.Find(err, func(e error) bool {
//nolint:errorlint
normalizedErr, ok := e.(*errors.Error)
return ok && normalizedErr.ID() == is.ID()
})
return errorFound != nil
}

// BR errors.
var (
ErrUnknown = errors.Normalize("internal error", errors.RFCCodeText("BR:Common:ErrUnknown"))
ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("BR:Common:ErrInvalidArgument"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))

ErrPDUpdateFailed = errors.Normalize("failed to update PD", errors.RFCCodeText("BR:PD:ErrPDUpdateFailed"))
ErrPDLeaderNotFound = errors.Normalize("PD leader not found", errors.RFCCodeText("BR:PD:ErrPDLeaderNotFound"))
Expand Down
Loading