-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
meta/autoid: make autoid client ResetConn operation concurrency-safe #50522
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ import ( | |
"context" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
|
@@ -56,6 +57,8 @@ type ClientDiscover struct { | |
// See https://github.com/grpc/grpc-go/issues/5321 | ||
*grpc.ClientConn | ||
} | ||
// version is increased in every ResetConn() to make the operation safe. | ||
version uint64 | ||
} | ||
|
||
const ( | ||
|
@@ -70,27 +73,27 @@ func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover { | |
} | ||
|
||
// GetClient gets the AutoIDAllocClient. | ||
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) { | ||
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, uint64, error) { | ||
d.mu.RLock() | ||
cli := d.mu.AutoIDAllocClient | ||
if cli != nil { | ||
d.mu.RUnlock() | ||
return cli, nil | ||
return cli, atomic.LoadUint64(&d.version), nil | ||
} | ||
d.mu.RUnlock() | ||
|
||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
if d.mu.AutoIDAllocClient != nil { | ||
return d.mu.AutoIDAllocClient, nil | ||
return d.mu.AutoIDAllocClient, atomic.LoadUint64(&d.version), nil | ||
} | ||
|
||
resp, err := d.etcdCli.Get(ctx, autoIDLeaderPath, clientv3.WithFirstCreate()...) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
return nil, 0, errors.Trace(err) | ||
} | ||
if len(resp.Kvs) == 0 { | ||
return nil, errors.New("autoid service leader not found") | ||
return nil, 0, errors.New("autoid service leader not found") | ||
} | ||
|
||
addr := string(resp.Kvs[0].Value) | ||
|
@@ -100,19 +103,19 @@ func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien | |
clusterSecurity := security.ClusterSecurity() | ||
tlsConfig, err := clusterSecurity.ToTLSConfig() | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
return nil, 0, errors.Trace(err) | ||
} | ||
opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) | ||
} | ||
logutil.BgLogger().Info("connect to leader", zap.String("category", "autoid client"), zap.String("addr", addr)) | ||
grpcConn, err := grpc.Dial(addr, opt) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
return nil, 0, errors.Trace(err) | ||
} | ||
cli = autoid.NewAutoIDAllocClient(grpcConn) | ||
d.mu.AutoIDAllocClient = cli | ||
d.mu.ClientConn = grpcConn | ||
return cli, nil | ||
return cli, atomic.LoadUint64(&d.version), nil | ||
} | ||
|
||
// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch. | ||
|
@@ -131,7 +134,7 @@ func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offs | |
|
||
var bo backoffer | ||
retry: | ||
cli, err := sp.GetClient(ctx) | ||
cli, ver, err := sp.GetClient(ctx) | ||
if err != nil { | ||
return 0, 0, errors.Trace(err) | ||
} | ||
|
@@ -149,7 +152,7 @@ retry: | |
metrics.AutoIDHistogram.WithLabelValues(metrics.TableAutoIDAlloc, metrics.RetLabel(err)).Observe(time.Since(start).Seconds()) | ||
if err != nil { | ||
if strings.Contains(err.Error(), "rpc error") { | ||
sp.ResetConn(err) | ||
sp.resetConn(ver, err) | ||
bo.Backoff() | ||
goto retry | ||
} | ||
|
@@ -188,6 +191,14 @@ func (b *backoffer) Backoff() { | |
time.Sleep(b.Duration) | ||
} | ||
|
||
func (d *ClientDiscover) resetConn(version uint64, reason error) { | ||
// Avoid repeated Reset operation | ||
if !atomic.CompareAndSwapUint64(&d.version, version, version+1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Introduce a |
||
return | ||
} | ||
d.ResetConn(reason) | ||
} | ||
|
||
// ResetConn reset the AutoIDAllocClient and underlying grpc connection. | ||
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd. | ||
func (d *ClientDiscover) ResetConn(reason error) { | ||
|
@@ -205,10 +216,14 @@ func (d *ClientDiscover) ResetConn(reason error) { | |
d.mu.Unlock() | ||
// Close grpc.ClientConn to release resource. | ||
if grpcConn != nil { | ||
err := grpcConn.Close() | ||
if err != nil { | ||
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err)) | ||
} | ||
go func() { | ||
// Doen't close the conn immediately, in case the other sessions are still using it. | ||
time.Sleep(200 * time.Millisecond) | ||
err := grpcConn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delay the Close() operation, so the other session that still reference this grpcConn do not get the "grpc: the client connection is closing" error. |
||
if err != nil { | ||
logutil.BgLogger().Warn("close grpc connection error", zap.String("category", "autoid client"), zap.Error(err)) | ||
} | ||
}() | ||
} | ||
} | ||
|
||
|
@@ -235,7 +250,7 @@ func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) e | |
func (sp *singlePointAlloc) rebase(ctx context.Context, newBase int64, force bool) error { | ||
var bo backoffer | ||
retry: | ||
cli, err := sp.GetClient(ctx) | ||
cli, ver, err := sp.GetClient(ctx) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
@@ -249,7 +264,7 @@ retry: | |
}) | ||
if err != nil { | ||
if strings.Contains(err.Error(), "rpc error") { | ||
sp.ResetConn(err) | ||
sp.resetConn(ver, err) | ||
bo.Backoff() | ||
goto retry | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
atomic.Uint64
?