Skip to content

Commit

Permalink
*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (#48870)
Browse files Browse the repository at this point in the history
close #48869
  • Loading branch information
tiancaiamao authored Nov 24, 2023
1 parent e9d9062 commit 8eb1913
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 42 deletions.
1 change: 0 additions & 1 deletion br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ go_test(
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_uber_go_goleak//:goleak",
Expand Down
3 changes: 1 addition & 2 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/pkg/store/mockstore"
tmock "github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

func newTableInfo(t *testing.T,
Expand Down Expand Up @@ -169,7 +168,7 @@ func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type TableImporter struct {
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

// dupIgnoreRows tracks the rowIDs of rows that are duplicated and should be ignored.
dupIgnoreRows extsort.ExternalSorter
Expand All @@ -95,6 +96,7 @@ func NewTableImporter(
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableImporter{
tableName: tableName,
Expand All @@ -105,6 +107,7 @@ func NewTableImporter(
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -328,9 +331,9 @@ func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// GetEtcdClient implements the autoid.Requirement interface.
func (tr *TableImporter) GetEtcdClient() *clientv3.Client {
return tr.etcdCli
// AutoIDClient implements the autoid.Requirement interface.
func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
Expand Down
5 changes: 2 additions & 3 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"github.com/pingcap/tidb/pkg/util/rowcodec"
"github.com/pingcap/tidb/pkg/util/sqlexec"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1705,8 +1704,8 @@ func (r *asAutoIDRequirement) Store() kv.Storage {
return r.store
}

func (r *asAutoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

// applyNewAutoRandomBits set auto_random bits to TableInfo and
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -357,6 +358,7 @@ type ddlCtx struct {
statsHandle *handle.Handle
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

*waitSchemaSyncedController
*schemaVersionManager
Expand Down Expand Up @@ -655,6 +657,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
infoCache: opt.InfoCache,
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
autoidCli: opt.AutoIDClient,
schemaVersionManager: newSchemaVersionManager(),
waitSchemaSyncedController: newWaitSchemaSyncedController(),
runningJobIDs: make([]string, 0, jobRecordCapacity),
Expand Down
19 changes: 14 additions & 5 deletions pkg/ddl/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
clientv3 "go.etcd.io/etcd/client/v3"
)

Expand All @@ -27,11 +28,12 @@ type Option func(*Options)

// Options represents all the options of the DDL module needs
type Options struct {
EtcdCli *clientv3.Client
Store kv.Storage
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
EtcdCli *clientv3.Client
Store kv.Storage
AutoIDClient *autoid.ClientDiscover
InfoCache *infoschema.InfoCache
Hook Callback
Lease time.Duration
}

// WithEtcdClient specifies the `clientv3.Client` of DDL used to request the etcd service
Expand All @@ -55,6 +57,13 @@ func WithInfoCache(ic *infoschema.InfoCache) Option {
}
}

// WithAutoIDClient specifies the autoid client used by the autoid service for those AUTO_ID_CACHE=1 tables.
func WithAutoIDClient(cli *autoid.ClientDiscover) Option {
return func(options *Options) {
options.AutoIDClient = cli
}
}

// WithHook specifies the `Callback` of DDL used to notify the outer module when events are triggered
func WithHook(callback Callback) Option {
return func(options *Options) {
Expand Down
13 changes: 8 additions & 5 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/etcd"
Expand Down Expand Up @@ -246,16 +247,16 @@ func setBackoffWeight(se sessionctx.Context, taskMeta *TaskMeta, logger *zap.Log
}

type autoIDRequirement struct {
store kv.Storage
etcdCli *clientv3.Client
store kv.Storage
autoidCli *autoid.ClientDiscover
}

func (r *autoIDRequirement) Store() kv.Storage {
return r.store
}

func (r *autoIDRequirement) GetEtcdClient() *clientv3.Client {
return r.etcdCli
func (r *autoIDRequirement) AutoIDClient() *autoid.ClientDiscover {
return r.autoidCli
}

func rebaseAllocatorBases(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *PostProcessStepMeta, logger *zap.Logger) (err error) {
Expand Down Expand Up @@ -295,10 +296,12 @@ func rebaseAllocatorBases(ctx context.Context, taskMeta *TaskMeta, subtaskMeta *
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec()))
r := autoIDRequirement{store: kvStore, etcdCli: etcdCli}
autoidCli := autoid.NewClientDiscover(etcdCli)
r := autoIDRequirement{store: kvStore, autoidCli: autoidCli}
err = common.RebaseTableAllocators(ctx, subtaskMeta.MaxIDs, &r, taskMeta.Plan.DBID, taskMeta.Plan.DesiredTableInfo)
if err1 := etcdCli.Close(); err1 != nil {
logger.Info("close etcd client error", zap.Error(err1))
}
autoidCli.ResetConn(nil)
return errors.Trace(err)
}
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/keyspace",
"//pkg/kv",
"//pkg/meta",
"//pkg/meta/autoid",
"//pkg/metrics",
"//pkg/owner",
"//pkg/parser/ast",
Expand Down
15 changes: 13 additions & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -142,6 +143,8 @@ type Domain struct {
exit chan struct{}
// `etcdClient` must be used when keyspace is not set, or when the logic to each etcd path needs to be separated by keyspace.
etcdClient *clientv3.Client
// autoidClient is used when there are tables with AUTO_ID_CACHE=1, it is the client to the autoid service.
autoidClient *autoid.ClientDiscover
// `unprefixedEtcdCli` will never set the etcd namespace prefix by keyspace.
// It is only used in storeMinStartTS and RemoveMinStartTS now.
// It must be used when the etcd path isn't needed to separate by keyspace.
Expand Down Expand Up @@ -1139,6 +1142,8 @@ func (do *Domain) Init(

do.etcdClient = cli

do.autoidClient = autoid.NewClientDiscover(cli)

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1170,6 +1175,7 @@ func (do *Domain) Init(
ctx,
ddl.WithEtcdClient(do.etcdClient),
ddl.WithStore(do.store),
ddl.WithAutoIDClient(do.autoidClient),
ddl.WithInfoCache(do.infoCache),
ddl.WithHook(callback),
ddl.WithLease(ddlLease),
Expand Down Expand Up @@ -1622,6 +1628,11 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// AutoIDClient returns the autoid client.
func (do *Domain) AutoIDClient() *autoid.ClientDiscover {
return do.autoidClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
Expand Down Expand Up @@ -1916,7 +1927,7 @@ func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner.
// in BootstrapSession.
func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) {
ctx.GetSessionVars().InRestrictedSQL = true
err := telemetry.InitialRun(ctx, do.GetEtcdClient())
err := telemetry.InitialRun(ctx, do.etcdClient)
if err != nil {
logutil.BgLogger().Warn("Initial telemetry run failed", zap.Error(err))
}
Expand All @@ -1937,7 +1948,7 @@ func (do *Domain) TelemetryReportLoop(ctx sessionctx.Context) {
if !owner.IsOwner() {
continue
}
err := telemetry.ReportUsageData(ctx, do.GetEtcdClient())
err := telemetry.ReportUsageData(ctx, do.etcdClient)
if err != nil {
// Only status update errors will be printed out
logutil.BgLogger().Warn("TelemetryReportLoop status update failed", zap.Error(err))
Expand Down
1 change: 0 additions & 1 deletion pkg/meta/autoid/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_client_v3//:client",
"@org_uber_go_goleak//:goleak",
],
)
9 changes: 4 additions & 5 deletions pkg/meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
tikvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -574,12 +573,12 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin
isUnsigned: isUnsigned,
keyspaceID: keyspaceID,
}
if r.GetEtcdClient() == nil {
if r.AutoIDClient() == nil {
// Only for test in mockstore
spa.clientDiscover = clientDiscover{}
spa.ClientDiscover = &ClientDiscover{}
spa.mu.AutoIDAllocClient = MockForTest(r.Store())
} else {
spa.clientDiscover = clientDiscover{etcdCli: r.GetEtcdClient()}
spa.ClientDiscover = r.AutoIDClient()
}

// mockAutoIDChange failpoint is not implemented in this allocator, so fallback to use the default one.
Expand All @@ -594,7 +593,7 @@ func newSinglePointAlloc(r Requirement, dbID, tblID int64, isUnsigned bool) *sin
// Requirement is the parameter required by NewAllocator
type Requirement interface {
Store() kv.Storage
GetEtcdClient() *clientv3.Client
AutoIDClient() *ClientDiscover
}

// NewAllocator returns a new auto increment id generator on the store.
Expand Down
39 changes: 26 additions & 13 deletions pkg/meta/autoid/autoid_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ type singlePointAlloc struct {
tblID int64
lastAllocated int64
isUnsigned bool
clientDiscover
*ClientDiscover
keyspaceID uint32
}

type clientDiscover struct {
// ClientDiscover is used to get the AutoIDAllocClient, it creates the grpc connection with autoid service leader.
type ClientDiscover struct {
// This the etcd client for service discover
etcdCli *clientv3.Client
// This is the real client for the AutoIDAlloc service
Expand All @@ -61,7 +62,15 @@ const (
autoIDLeaderPath = "tidb/autoid/leader"
)

func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
// NewClientDiscover creates a ClientDiscover object.
func NewClientDiscover(etcdCli *clientv3.Client) *ClientDiscover {
return &ClientDiscover{
etcdCli: etcdCli,
}
}

// GetClient gets the AutoIDAllocClient.
func (d *ClientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClient, error) {
d.mu.RLock()
cli := d.mu.AutoIDAllocClient
if cli != nil {
Expand Down Expand Up @@ -140,7 +149,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn(err)
sp.ResetConn(err)
goto retry
}
return 0, 0, errors.Trace(err)
Expand All @@ -157,15 +166,19 @@ retry:

const backoffDuration = 200 * time.Millisecond

func (sp *singlePointAlloc) resetConn(reason error) {
logutil.BgLogger().Info("reset grpc connection", zap.String("category", "autoid client"),
zap.String("reason", reason.Error()))
// ResetConn reset the AutoIDAllocClient and underlying grpc connection.
// The next GetClient() call will recreate the client connecting to the correct leader by querying etcd.
func (d *ClientDiscover) ResetConn(reason error) {
if reason != nil {
logutil.BgLogger().Info("reset grpc connection", zap.String("category", "autoid client"),
zap.String("reason", reason.Error()))
}
var grpcConn *grpc.ClientConn
sp.mu.Lock()
grpcConn = sp.mu.ClientConn
sp.mu.AutoIDAllocClient = nil
sp.mu.ClientConn = nil
sp.mu.Unlock()
d.mu.Lock()
grpcConn = d.mu.ClientConn
d.mu.AutoIDAllocClient = nil
d.mu.ClientConn = nil
d.mu.Unlock()
// Close grpc.ClientConn to release resource.
if grpcConn != nil {
err := grpcConn.Close()
Expand Down Expand Up @@ -212,7 +225,7 @@ retry:
if err != nil {
if strings.Contains(err.Error(), "rpc error") {
time.Sleep(backoffDuration)
sp.resetConn(err)
sp.ResetConn(err)
goto retry
}
return errors.Trace(err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/meta/autoid/autoid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

type mockRequirement struct {
Expand All @@ -44,7 +43,7 @@ func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) GetEtcdClient() *clientv3.Client {
func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

Expand Down

0 comments on commit 8eb1913

Please sign in to comment.