From bc045c8dae85e0aa1a86c4b0abaaf8f0dc84045e Mon Sep 17 00:00:00 2001 From: Olivia Chen Date: Thu, 28 Mar 2024 13:06:10 -0700 Subject: [PATCH] operator: make gRPC connections synced (#52051) (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close pingcap/tidb#52049 Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> --- br/pkg/backup/prepare_snap/env.go | 32 +++++++++++++++++++++- br/pkg/backup/prepare_snap/prepare_test.go | 2 +- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/prepare_snap/env.go b/br/pkg/backup/prepare_snap/env.go index 8d4cdaccead01..717bf2e7f68e2 100644 --- a/br/pkg/backup/prepare_snap/env.go +++ b/br/pkg/backup/prepare_snap/env.go @@ -16,6 +16,8 @@ package preparesnap import ( "context" + "slices" + "sync" "time" "github.com/docker/go-units" @@ -128,6 +130,34 @@ func (c CliEnv) GetAllLiveStores(ctx context.Context) ([]*metapb.Store, error) { return withoutTiFlash, err } +func AdaptForGRPCInTest(p PrepareClient) PrepareClient { + return &gRPCGoAdapter{ + inner: p, + } +} + +// GrpcGoAdapter makes the `Send` call synchronous. +// grpc-go doesn't guarantee concurrency call to `Send` or `Recv` is safe. +// But concurrency call to `send` and `recv` is safe. +// This type is exported for testing. +type gRPCGoAdapter struct { + inner PrepareClient + sendMu sync.Mutex + recvMu sync.Mutex +} + +func (s *gRPCGoAdapter) Send(req *brpb.PrepareSnapshotBackupRequest) error { + s.sendMu.Lock() + defer s.sendMu.Unlock() + return s.inner.Send(req) +} + +func (s *gRPCGoAdapter) Recv() (*brpb.PrepareSnapshotBackupResponse, error) { + s.recvMu.Lock() + defer s.recvMu.Unlock() + return s.inner.Recv() +} + func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClient, error) { var cli brpb.Backup_PrepareSnapshotBackupClient err := c.Mgr.TryWithConn(ctx, storeID, func(cc *grpc.ClientConn) error { @@ -142,7 +172,7 @@ func (c CliEnv) ConnectToStore(ctx context.Context, storeID uint64) (PrepareClie if err != nil { return nil, err } - return cli, nil + return &gRPCGoAdapter{inner: cli}, nil } func (c CliEnv) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) { diff --git a/br/pkg/backup/prepare_snap/prepare_test.go b/br/pkg/backup/prepare_snap/prepare_test.go index 80128726c935a..f71b790edcc39 100644 --- a/br/pkg/backup/prepare_snap/prepare_test.go +++ b/br/pkg/backup/prepare_snap/prepare_test.go @@ -177,7 +177,7 @@ func (m *mockStores) ConnectToStore(ctx context.Context, storeID uint64) (Prepar } m.onCreateStore(m.stores[storeID]) } - return m.stores[storeID], nil + return AdaptForGRPCInTest(m.stores[storeID]), nil } func (m *mockStores) LoadRegionsInKeyRange(ctx context.Context, startKey []byte, endKey []byte) (regions []Region, err error) {