diff --git a/domain/domain.go b/domain/domain.go index eea5ae356e33f..fbc9d723141d3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -734,12 +734,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio infoCache: infoschema.NewCache(16), slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, -<<<<<<< HEAD dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}}, - onClose: onClose, -======= - dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}}, ->>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409)) expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), } @@ -887,162 +882,11 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain) return nil } -<<<<<<< HEAD -======= // SetOnClose used to set do.onClose func. func (do *Domain) SetOnClose(onClose func()) { do.onClose = onClose } -func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error { - cfg := config.GetGlobalConfig() - if pdClient == nil || do.etcdClient == nil { - log.Warn("pd / etcd client not provided, won't begin Advancer.") - return nil - } - env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg) - if err != nil { - return err - } - adv := streamhelper.NewCheckpointAdvancer(env) - do.logBackupAdvancer = daemon.New(adv, streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient), adv.Config().TickDuration) - loop, err := do.logBackupAdvancer.Begin(ctx) - if err != nil { - return err - } - do.wg.Run(loop) - return nil -} - -// when tidb_replica_read = 'closest-adaptive', check tidb and tikv's zone label matches. -// if not match, disable replica_read to avoid uneven read traffic distribution. -func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.Client) { - defer util.Recover(metrics.LabelDomain, "closestReplicaReadCheckLoop", nil, false) - - // trigger check once instantly. - if err := do.checkReplicaRead(ctx, pdClient); err != nil { - logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err)) - } - - ticker := time.NewTicker(time.Minute) - defer func() { - ticker.Stop() - do.wg.Done() - logutil.BgLogger().Info("closestReplicaReadCheckLoop exited.") - }() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if err := do.checkReplicaRead(ctx, pdClient); err != nil { - logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err)) - } - } - } -} - -// Periodically check and update the replica-read status when `tidb_replica_read` is set to "closest-adaptive" -// We disable "closest-adaptive" in following conditions to ensure the read traffic is evenly distributed across -// all AZs: -// - There are no TiKV servers in the AZ of this tidb instance -// - The AZ if this tidb contains more tidb than other AZ and this tidb's id is the bigger one. -func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) error { - do.sysVarCache.RLock() - replicaRead := do.sysVarCache.global[variable.TiDBReplicaRead] - do.sysVarCache.RUnlock() - - if !strings.EqualFold(replicaRead, "closest-adaptive") { - logutil.BgLogger().Debug("closest replica read is not enabled, skip check!", zap.String("mode", replicaRead)) - return nil - } - - serverInfo, err := infosync.GetServerInfo() - if err != nil { - return err - } - zone := "" - for k, v := range serverInfo.Labels { - if k == placement.DCLabelKey && v != "" { - zone = v - break - } - } - if zone == "" { - logutil.BgLogger().Debug("server contains no 'zone' label, disable closest replica read", zap.Any("labels", serverInfo.Labels)) - variable.SetEnableAdaptiveReplicaRead(false) - return nil - } - - stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone()) - if err != nil { - return err - } - - storeZones := make(map[string]int) - for _, s := range stores { - // skip tumbstone stores or tiflash - if s.NodeState == metapb.NodeState_Removing || s.NodeState == metapb.NodeState_Removed || engine.IsTiFlash(s) { - continue - } - for _, label := range s.Labels { - if label.Key == placement.DCLabelKey && label.Value != "" { - storeZones[label.Value] = 0 - break - } - } - } - - // no stores in this AZ - if _, ok := storeZones[zone]; !ok { - variable.SetEnableAdaptiveReplicaRead(false) - return nil - } - - servers, err := infosync.GetAllServerInfo(ctx) - if err != nil { - return err - } - svrIdsInThisZone := make([]string, 0) - for _, s := range servers { - if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" { - if _, ok := storeZones[v]; ok { - storeZones[v] += 1 - if v == zone { - svrIdsInThisZone = append(svrIdsInThisZone, s.ID) - } - } - } - } - enabledCount := math.MaxInt - for _, count := range storeZones { - if count < enabledCount { - enabledCount = count - } - } - // sort tidb in the same AZ by ID and disable the tidb with bigger ID - // because ID is unchangeable, so this is a simple and stable algorithm to select - // some instances across all tidb servers. - if enabledCount < len(svrIdsInThisZone) { - sort.Slice(svrIdsInThisZone, func(i, j int) bool { - return strings.Compare(svrIdsInThisZone[i], svrIdsInThisZone[j]) < 0 - }) - } - enabled := true - for _, s := range svrIdsInThisZone[enabledCount:] { - if s == serverInfo.ID { - enabled = false - break - } - } - - if variable.SetEnableAdaptiveReplicaRead(enabled) { - logutil.BgLogger().Info("tidb server adaptive closest replica read is changed", zap.Bool("enable", enabled)) - } - return nil -} - ->>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409)) type sessionPool struct { resources chan pools.Resource factory pools.Factory diff --git a/domain/domain_test.go b/domain/domain_test.go index 004c3ddf97f1d..604a94f0fed92 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -215,184 +215,3 @@ func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil } func (mebd *mockEtcdBackend) StartGCWorker() error { panic("not implemented") } -<<<<<<< HEAD -======= - -func TestClosestReplicaReadChecker(t *testing.T) { - store, err := mockstore.NewMockStore() - require.NoError(t, err) - - ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory) - defer func() { - dom.Close() - require.Nil(t, store.Close()) - }() - dom.sysVarCache.Lock() - dom.sysVarCache.global = map[string]string{ - variable.TiDBReplicaRead: "closest-adaptive", - } - dom.sysVarCache.Unlock() - - makeFailpointRes := func(v interface{}) string { - bytes, err := json.Marshal(v) - require.NoError(t, err) - return fmt.Sprintf("return(`%s`)", string(bytes)) - } - - mockedAllServerInfos := map[string]*infosync.ServerInfo{ - "s1": { - ID: "s1", - Labels: map[string]string{ - "zone": "zone1", - }, - }, - "s2": { - ID: "s2", - Labels: map[string]string{ - "zone": "zone2", - }, - }, - } - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos))) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos["s2"]))) - - stores := []*metapb.Store{ - { - Labels: []*metapb.StoreLabel{ - { - Key: "zone", - Value: "zone1", - }, - }, - }, - { - Labels: []*metapb.StoreLabel{ - { - Key: "zone", - Value: "zone2", - }, - }, - }, - { - Labels: []*metapb.StoreLabel{ - { - Key: "zone", - Value: "zone3", - }, - }, - }, - } - - enabled := variable.IsAdaptiveReplicaReadEnabled() - - ctx := context.Background() - pdClient := &mockInfoPdClient{} - - // check error - pdClient.err = errors.New("mock error") - err = dom.checkReplicaRead(ctx, pdClient) - require.Error(t, err) - require.Equal(t, enabled, variable.IsAdaptiveReplicaReadEnabled()) - - // labels matches, should be enabled - pdClient.err = nil - pdClient.stores = stores[:2] - variable.SetEnableAdaptiveReplicaRead(false) - err = dom.checkReplicaRead(ctx, pdClient) - require.Nil(t, err) - require.True(t, variable.IsAdaptiveReplicaReadEnabled()) - - // labels don't match, should disable the flag - for _, i := range []int{0, 1, 3} { - pdClient.stores = stores[:i] - variable.SetEnableAdaptiveReplicaRead(true) - err = dom.checkReplicaRead(ctx, pdClient) - require.Nil(t, err) - require.False(t, variable.IsAdaptiveReplicaReadEnabled()) - } - - // partial matches - mockedAllServerInfos = map[string]*infosync.ServerInfo{ - "s1": { - ID: "s1", - Labels: map[string]string{ - "zone": "zone1", - }, - }, - "s2": { - ID: "s2", - Labels: map[string]string{ - "zone": "zone2", - }, - }, - "s22": { - ID: "s22", - Labels: map[string]string{ - "zone": "zone2", - }, - }, - "s3": { - ID: "s3", - Labels: map[string]string{ - "zone": "zone3", - }, - }, - "s4": { - ID: "s4", - Labels: map[string]string{ - "zone": "zone4", - }, - }, - } - pdClient.stores = stores - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos))) - cases := []struct { - id string - matches bool - }{ - { - id: "s1", - matches: true, - }, - { - id: "s2", - matches: true, - }, - { - id: "s22", - matches: false, - }, - { - id: "s3", - matches: true, - }, - { - id: "s4", - matches: false, - }, - } - for _, c := range cases { - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos[c.id]))) - variable.SetEnableAdaptiveReplicaRead(!c.matches) - err = dom.checkReplicaRead(ctx, pdClient) - require.Nil(t, err) - require.Equal(t, c.matches, variable.IsAdaptiveReplicaReadEnabled()) - } - - variable.SetEnableAdaptiveReplicaRead(true) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo")) -} - -type mockInfoPdClient struct { - pd.Client - stores []*metapb.Store - err error -} - -func (c *mockInfoPdClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) { - return c.stores, c.err -} ->>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409)) diff --git a/session/tidb.go b/session/tidb.go index 68d3a8b0f3fff..9ba9c63d9fcc3 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -40,12 +40,11 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" ) type domainMap struct { - mu syncutil.Mutex + mu sync.Mutex domains map[string]*domain.Domain } @@ -80,21 +79,8 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { zap.Stringer("index usage sync lease", idxUsageSyncLease)) factory := createSessionFunc(store) sysFactory := createSessionWithDomainFunc(store) -<<<<<<< HEAD - onClose := func() { - dm.Delete(store) - } - d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) - err1 = d.Init(ddlLease, sysFactory) -======= d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory) - - var ddlInjector func(ddl.DDL) *schematracker.Checker - if injector, ok := store.(schematracker.StorageDDLInjector); ok { - ddlInjector = injector.Injector - } - err1 = d.Init(ddlLease, sysFactory, ddlInjector) ->>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409)) + err1 = d.Init(ddlLease, sysFactory) if err1 != nil { // If we don't clean it, there are some dirty data when retrying the function of Init. d.Close()