From 13c38eaf0cbfbcb714b80abd71521ee929bf7089 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 19 Jan 2024 15:57:17 +0800 Subject: [PATCH] meta/autoid: make autoid client ResetConn operation concurrency-safe (#50522) close pingcap/tidb#50519 --- pkg/meta/autoid/autoid_service.go | 47 ++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/pkg/meta/autoid/autoid_service.go b/pkg/meta/autoid/autoid_service.go index 3334e9b6af74a..123989fd8ab94 100644 --- a/pkg/meta/autoid/autoid_service.go +++ b/pkg/meta/autoid/autoid_service.go @@ -18,6 +18,7 @@ import ( "context" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -56,6 +57,8 @@ type ClientDiscover struct { // See https://github.com/grpc/grpc-go/issues/5321 *grpc.ClientConn } + // version is increased in every ResetConn() to make the operation safe. + version uint64 } const ( @@ -70,27 +73,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover { } // GetClient gets the AutoIDAllocClient. -func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { +func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) { d.mu.RLock() cli := d.mu.AutoIDAllocClient if cli != nil { d.mu.RUnlock() - return cli, nil + return cli, atomic.LoadUint64(&d.version), nil } d.mu.RUnlock() d.mu.Lock() defer d.mu.Unlock() if d.mu.AutoIDAllocClient != nil { - return d.mu.AutoIDAllocClient, nil + return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil } resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...) if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } if len(resp.Kvs) == 0 { - return nil, errors.New("autoid service leader not found") + return nil, 0, errors.New("autoid service leader not found") } addr := string(resp.Kvs[0].Value) @@ -100,19 +103,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien clusterSecurity := security.ClusterSecurity() tlsConfig, err := clusterSecurity.ToTLSConfig() if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } logutil.BgLogger().Info("connect to leader", zap.String("category", "autoid client"), zap.String("addr", addr)) grpcConn, err := grpc.Dial(addr, opt) if err != nil { - return nil, errors.Trace(err) + return nil, 0, errors.Trace(err) } cli = autoid.NewAutoIDAllocClient(grpcConn) d.mu.AutoIDAllocClient = cli d.mu.ClientConn = grpcConn - return cli, nil + return cli, atomic.LoadUint64(&d.version), nil } // Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch. @@ -130,7 +133,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs } retry: - cli, err := sp.GetClient(ctx) + cli, ver, err := sp.GetClient(ctx) if err != nil { return 0, 0, errors.Trace(err) } @@ -149,7 +152,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.ResetConn(err) + sp.resetConn(ver, err) goto retry } return 0, 0, errors.Trace(err) @@ -166,6 +169,14 @@ retry: const backoffDuration = 200 * time.Millisecond +func (d *ClientDiscover) resetConn(version uint64, reason error) { + // Avoid repeated Reset operation + if !atomic.CompareAndSwapUint64(&d.version, version, version+1) { + return + } + d.ResetConn(reason) +} + // ResetConn reset the AutoIDAllocClient and underlying grpc connection. // The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. func (d *ClientDiscover) ResetConn(reason error) { @@ -181,10 +192,14 @@ func (d *ClientDiscover) ResetConn(reason error) { d.mu.Unlock() // Close grpc.ClientConn to release resource. if grpcConn != nil { - err := grpcConn.Close() - if err != nil { - logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err)) - } + go func() { + // Doen't close the conn immediately, in case the other sessions are still using it. + time.Sleep(200 * time.Millisecond) + err := grpcConn.Close() + if err != nil { + logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err)) + } + }() } } @@ -210,7 +225,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error { retry: - cli, err := sp.GetClient(ctx) + cli, ver, err := sp.GetClient(ctx) if err != nil { return errors.Trace(err) } @@ -225,7 +240,7 @@ retry: if err != nil { if strings.Contains(err.Error(), "rpc error") { time.Sleep(backoffDuration) - sp.ResetConn(err) + sp.resetConn(ver, err) goto retry } return errors.Trace(err)