From 006254b3e2e62a2f83d3724218188482358d64e6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 2 Sep 2024 11:30:10 +0800 Subject: [PATCH] br: resolve stuck in backup (#54736) (#55657) close pingcap/tidb#53480 --- br/pkg/backup/BUILD.bazel | 2 + br/pkg/backup/client.go | 64 +++++++++++++++++++++++- br/pkg/backup/store_test.go | 98 +++++++++++++++++++++++++++++++++++++ 3 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 br/pkg/backup/store_test.go diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 22b3e60ae0d2f..9a0e52486e736 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -66,6 +66,7 @@ go_test( "client_test.go", "main_test.go", "schema_test.go", + "store_test.go", ], embed = [":backup"], flaky = True, @@ -97,6 +98,7 @@ go_test( "@com_github_tikv_client_go_v2//txnkv/txnlock", "@com_github_tikv_pd_client//:client", "@io_opencensus_go//stats/view", + "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], ) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 2d6e7fd275c3d..baeda36d80b40 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1218,12 +1218,73 @@ func (bc *Client) handleFineGrained( return backoffMill, nil } +// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`. +type timeoutRecv struct { + wg sync.WaitGroup + parentCtx context.Context + cancel context.CancelFunc + + refresh chan struct{} +} + +// Refresh the timeout ticker +func (trecv *timeoutRecv) Refresh() { + select { + case <-trecv.parentCtx.Done(): + case trecv.refresh <- struct{}{}: + } +} + +// Stop the timeout ticker +func (trecv *timeoutRecv) Stop() { + close(trecv.refresh) + trecv.wg.Wait() +} + +var TimeoutOneResponse = time.Hour + +func (trecv *timeoutRecv) loop(timeout time.Duration) { + defer trecv.wg.Done() + ticker := time.NewTicker(timeout) + defer ticker.Stop() + for { + ticker.Reset(timeout) + select { + case <-trecv.parentCtx.Done(): + return + case _, ok := <-trecv.refresh: + if !ok { + return + } + case <-ticker.C: + log.Warn("receive a backup response timeout") + trecv.cancel() + } + } +} + +func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) { + cctx, cancel := context.WithCancel(ctx) + trecv := &timeoutRecv{ + parentCtx: ctx, + cancel: cancel, + refresh: make(chan struct{}), + } + trecv.wg.Add(1) + go trecv.loop(timeout) + return cctx, trecv +} + func doSendBackup( - ctx context.Context, + pctx context.Context, client backuppb.BackupClient, req backuppb.BackupRequest, respFn func(*backuppb.BackupResponse) error, ) error { + // Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to + // terminate the backup if it does not receive any new response for a long time. + ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse) + defer timerecv.Stop() failpoint.Inject("hint-backup-start", func(v failpoint.Value) { logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + "process will notify the shell.") @@ -1268,6 +1329,7 @@ func doSendBackup( for { resp, err := bCli.Recv() + timerecv.Refresh() if err != nil { if errors.Cause(err) == io.EOF { // nolint:errorlint logutil.CL(ctx).Debug("backup streaming finish", diff --git a/br/pkg/backup/store_test.go b/br/pkg/backup/store_test.go new file mode 100644 index 0000000000000..9ad928f01a635 --- /dev/null +++ b/br/pkg/backup/store_test.go @@ -0,0 +1,98 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package backup + +import ( + "context" + "io" + "testing" + "time" + + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" +) + +type MockBackupClient struct { + backuppb.BackupClient + + recvFunc func(context.Context) (*backuppb.BackupResponse, error) +} + +func (mbc *MockBackupClient) Backup(ctx context.Context, _ *backuppb.BackupRequest, _ ...grpc.CallOption) (backuppb.Backup_BackupClient, error) { + return &MockBackupBackupClient{ctx: ctx, recvFunc: mbc.recvFunc}, nil +} + +type MockBackupBackupClient struct { + backuppb.Backup_BackupClient + + ctx context.Context + recvFunc func(context.Context) (*backuppb.BackupResponse, error) +} + +func (mbbc *MockBackupBackupClient) CloseSend() error { + return nil +} + +func (mbbc *MockBackupBackupClient) Recv() (*backuppb.BackupResponse, error) { + if mbbc.recvFunc != nil { + return mbbc.recvFunc(mbbc.ctx) + } + return &backuppb.BackupResponse{}, nil +} + +func TestTimeoutRecv(t *testing.T) { + ctx := context.Background() + TimeoutOneResponse = time.Millisecond * 800 + // Just Timeout Once + { + err := doSendBackup(ctx, &MockBackupClient{ + recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) { + time.Sleep(time.Second) + require.Error(t, ctx.Err()) + return nil, io.EOF + }, + }, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil }) + require.NoError(t, err) + } + + // Timeout Not At First + { + count := 0 + err := doSendBackup(ctx, &MockBackupClient{ + recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) { + require.NoError(t, ctx.Err()) + if count == 15 { + time.Sleep(time.Second) + require.Error(t, ctx.Err()) + return nil, io.EOF + } + count += 1 + time.Sleep(time.Millisecond * 80) + return &backuppb.BackupResponse{}, nil + }, + }, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil }) + require.NoError(t, err) + } +} + +func TestTimeoutRecvCancel(t *testing.T) { + ctx := context.Background() + cctx, cancel := context.WithCancel(ctx) + + _, trecv := StartTimeoutRecv(cctx, time.Hour) + cancel() + trecv.wg.Wait() +}