Skip to content

Commit

Permalink
lightning: add more retryable err (#36673) (#36720)
Browse files Browse the repository at this point in the history
close #36674
  • Loading branch information
ti-srebot authored Jul 29, 2022
1 parent 66bc814 commit f0d4a96
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 95 deletions.
16 changes: 15 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -1307,6 +1308,19 @@ const (
retryIngest
)

func (local *local) isRetryableTiKVWriteError(err error) bool {
err = errors.Cause(err)
// io.EOF is not retryable in normal case
// but on TiKV restart, if we're writing to TiKV(through GRPC)
// it might return io.EOF(it's GRPC Unavailable in most case),
// we need to retry on this error.
// see SendMsg in https://pkg.go.dev/google.golang.org/grpc#ClientStream
if err == io.EOF {
return true
}
return common.IsRetryableError(err)
}

func (local *local) writeAndIngestPairs(
ctx context.Context,
engine *Engine,
Expand All @@ -1324,7 +1338,7 @@ loopWrite:
var rangeStats rangeStats
metas, finishedRange, rangeStats, err = local.WriteToTiKV(ctx, engine, region, start, end, regionSplitSize, regionSplitKeys)
if err != nil {
if !common.IsRetryableError(err) {
if !local.isRetryableTiKVWriteError(err) {
return err
}

Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/binary"
"io"
"math"
"math/rand"
"os"
Expand Down Expand Up @@ -1250,3 +1251,9 @@ func TestGetRegionSplitSizeKeys(t *testing.T) {
require.Equal(t, int64(1), splitSize)
require.Equal(t, int64(2), splitKeys)
}

func TestLocalIsRetryableTiKVWriteError(t *testing.T) {
l := local{}
require.True(t, l.isRetryableTiKVWriteError(io.EOF))
require.True(t, l.isRetryableTiKVWriteError(errors.Trace(io.EOF)))
}
31 changes: 23 additions & 8 deletions br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"io"
"net"
"os"
"regexp"
"strings"
"syscall"

"github.com/go-sql-driver/mysql"
Expand All @@ -31,7 +31,27 @@ import (
"google.golang.org/grpc/status"
)

var regionNotFullyReplicatedRe = regexp.MustCompile(`region \d+ is not fully replicated`)
// some component doesn't have an accurate named error or transform a named error into string,
// so we need to check by error message,
// such as distsql.Checksum which transforms tikv other-error into its own error
var retryableErrorMsgList = []string{
"is not fully replicated",
// for cluster >= 4.x, lightning calls distsql.Checksum to do checksum
// this error happens on when distsql.Checksum calls TiKV
// see https://github.com/pingcap/tidb/blob/2c3d4f1ae418881a95686e8b93d4237f2e76eec6/store/copr/coprocessor.go#L941
"coprocessor task terminated due to exceeding the deadline",
}

func isRetryableFromErrorMessage(err error) bool {
msg := err.Error()
msgLower := strings.ToLower(msg)
for _, errStr := range retryableErrorMsgList {
if strings.Contains(msgLower, errStr) {
return true
}
}
return false
}

// IsRetryableError returns whether the error is transient (e.g. network
// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This
Expand Down Expand Up @@ -91,16 +111,11 @@ func isSingleRetryableError(err error) bool {
}
return false
default:
if regionNotFullyReplicatedRe.MatchString(err.Error()) {
return true
}
switch status.Code(err) {
case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss:
return true
case codes.Unknown:
return false
default:
return false
return isRetryableFromErrorMessage(err)
}
}
}
1 change: 1 addition & 0 deletions br/pkg/lightning/common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ func TestIsRetryableError(t *testing.T) {
require.False(t, IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})))

require.True(t, IsRetryableError(errors.Errorf("region %d is not fully replicated", 1234)))
require.True(t, IsRetryableError(errors.New("other error: Coprocessor task terminated due to exceeding the deadline")))
}
186 changes: 109 additions & 77 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand All @@ -20,6 +22,11 @@ import (
"go.uber.org/zap"
)

const (
maxRetryOnStatusConflict = 30
maxBackoffTime = 30 * time.Second
)

type metaMgrBuilder interface {
Init(ctx context.Context) error
TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr
Expand Down Expand Up @@ -180,101 +187,126 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

needAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName),
m.tr.tableInfo.ID,
)
if err != nil {
return errors.Trace(err)
}
defer rows.Close()
var (
metaTaskID, rowIDBase, rowIDMax, maxRowIDMax int64
totalKvs, totalBytes, checksum uint64
statusValue string
)
for rows.Next() {
if err = rows.Scan(&metaTaskID, &rowIDBase, &rowIDMax, &totalKvs, &totalBytes, &checksum, &statusValue); err != nil {
return errors.Trace(err)
}
status, err := parseMetaStatus(statusValue)
tableChecksumingMsg := "Target table is calculating checksum. Please wait until the checksum is finished and try again."
doAllocTableRowIDsFn := func() error {
return exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName),
m.tr.tableInfo.ID,
)
if err != nil {
return err
return errors.Trace(err)
}
defer rows.Close()
var (
metaTaskID, rowIDBase, rowIDMax, maxRowIDMax int64
totalKvs, totalBytes, checksum uint64
statusValue string
)
for rows.Next() {
if err = rows.Scan(&metaTaskID, &rowIDBase, &rowIDMax, &totalKvs, &totalBytes, &checksum, &statusValue); err != nil {
return errors.Trace(err)
}
status, err := parseMetaStatus(statusValue)
if err != nil {
return err
}

// skip finished meta
if status >= metaStatusFinished {
continue
}
// skip finished meta
if status >= metaStatusFinished {
continue
}

if status == metaStatusChecksuming {
return common.ErrAllocTableRowIDs.GenWithStack("Target table is calculating checksum. Please wait until the checksum is finished and try again.")
}
if status == metaStatusChecksuming {
return common.ErrAllocTableRowIDs.GenWithStack(tableChecksumingMsg)
}

if metaTaskID == m.taskID {
curStatus = status
baseChecksum = checksum
baseTotalKvs = totalKvs
baseTotalBytes = totalBytes
if status >= metaStatusRowIDAllocated {
if rowIDMax-rowIDBase != rawRowIDMax {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase)
if metaTaskID == m.taskID {
curStatus = status
baseChecksum = checksum
baseTotalKvs = totalKvs
baseTotalBytes = totalBytes
if status >= metaStatusRowIDAllocated {
if rowIDMax-rowIDBase != rawRowIDMax {
return common.ErrAllocTableRowIDs.GenWithStack("verify allocator base failed. local: '%d', meta: '%d'", rawRowIDMax, rowIDMax-rowIDBase)
}
newRowIDBase = rowIDBase
newRowIDMax = rowIDMax
break
}
newRowIDBase = rowIDBase
newRowIDMax = rowIDMax
break
continue
}
continue
}

// other tasks has finished this logic, we needn't do again.
if status >= metaStatusRowIDAllocated {
newStatus = metaStatusRestoreStarted
}
// other tasks has finished this logic, we needn't do again.
if status >= metaStatusRowIDAllocated {
newStatus = metaStatusRestoreStarted
}

if rowIDMax > maxRowIDMax {
maxRowIDMax = rowIDMax
if rowIDMax > maxRowIDMax {
maxRowIDMax = rowIDMax
}
}
if err := rows.Err(); err != nil {
return errors.Trace(err)
}
}
if err := rows.Err(); err != nil {
return errors.Trace(err)
}

// no enough info are available, fetch row_id max for table
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
// no enough info are available, fetch row_id max for table
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
} else {
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax
}

// table contains no data, can skip checksum
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
newStatus = metaStatusRestoreStarted
}
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)

// nolint:gosec
query := fmt.Sprintf("update %s set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?", m.tableName)
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
if err != nil {
return errors.Trace(err)
}
} else {
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax
}

// table contains no data, can skip checksum
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
newStatus = metaStatusRestoreStarted
}

// nolint:gosec
query := fmt.Sprintf("update %s set row_id_base = ?, row_id_max = ?, status = ? where table_id = ? and task_id = ?", m.tableName)
_, err := tx.ExecContext(ctx, query, newRowIDBase, newRowIDMax, newStatus.String(), m.tr.tableInfo.ID, m.taskID)
if err != nil {
return errors.Trace(err)
curStatus = newStatus
}

curStatus = newStatus
return nil
})
}
// TODO: the retry logic is duplicate with code in local.writeAndIngestByRanges, should encapsulate it later.
// max retry backoff time: 2+4+8+16+30*26=810s
backOffTime := time.Second
for i := 0; i < maxRetryOnStatusConflict; i++ {
err = doAllocTableRowIDsFn()
if err == nil || !strings.Contains(err.Error(), tableChecksumingMsg) {
break
}
return nil
})
// we only retry if it's tableChecksuming error, it happens during parallel import.
// for detail see https://docs.pingcap.com/tidb/stable/tidb-lightning-distributed-import
log.FromContext(ctx).Warn("target table is doing checksum, will try again",
zap.Int("retry time", i+1), log.ShortError(err))
backOffTime *= 2
if backOffTime > maxBackoffTime {
backOffTime = maxBackoffTime
}
select {
case <-time.After(backOffTime):
case <-ctx.Done():
return nil, 0, errors.Trace(ctx.Err())
}
}
if err != nil {
return nil, 0, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit f0d4a96

Please sign in to comment.