Skip to content

Commit

Permalink
fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Jan 9, 2023
1 parent 157abe9 commit 8f7d8cd
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 353 deletions.
156 changes: 0 additions & 156 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
infoCache: infoschema.NewCache(16),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
<<<<<<< HEAD
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
onClose: onClose,
=======
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{replayer.GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
>>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409))
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}

Expand Down Expand Up @@ -887,162 +882,11 @@ func (do *Domain) Init(ddlLease time.Duration, sysExecutorFactory func(*Domain)
return nil
}

<<<<<<< HEAD
=======
// SetOnClose used to set do.onClose func.
func (do *Domain) SetOnClose(onClose func()) {
do.onClose = onClose
}

func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
cfg := config.GetGlobalConfig()
if pdClient == nil || do.etcdClient == nil {
log.Warn("pd / etcd client not provided, won't begin Advancer.")
return nil
}
env, err := streamhelper.TiDBEnv(pdClient, do.etcdClient, cfg)
if err != nil {
return err
}
adv := streamhelper.NewCheckpointAdvancer(env)
do.logBackupAdvancer = daemon.New(adv, streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient), adv.Config().TickDuration)
loop, err := do.logBackupAdvancer.Begin(ctx)
if err != nil {
return err
}
do.wg.Run(loop)
return nil
}

// when tidb_replica_read = 'closest-adaptive', check tidb and tikv's zone label matches.
// if not match, disable replica_read to avoid uneven read traffic distribution.
func (do *Domain) closestReplicaReadCheckLoop(ctx context.Context, pdClient pd.Client) {
defer util.Recover(metrics.LabelDomain, "closestReplicaReadCheckLoop", nil, false)

// trigger check once instantly.
if err := do.checkReplicaRead(ctx, pdClient); err != nil {
logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err))
}

ticker := time.NewTicker(time.Minute)
defer func() {
ticker.Stop()
do.wg.Done()
logutil.BgLogger().Info("closestReplicaReadCheckLoop exited.")
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := do.checkReplicaRead(ctx, pdClient); err != nil {
logutil.BgLogger().Warn("refresh replicaRead flag failed", zap.Error(err))
}
}
}
}

// Periodically check and update the replica-read status when `tidb_replica_read` is set to "closest-adaptive"
// We disable "closest-adaptive" in following conditions to ensure the read traffic is evenly distributed across
// all AZs:
// - There are no TiKV servers in the AZ of this tidb instance
// - The AZ if this tidb contains more tidb than other AZ and this tidb's id is the bigger one.
func (do *Domain) checkReplicaRead(ctx context.Context, pdClient pd.Client) error {
do.sysVarCache.RLock()
replicaRead := do.sysVarCache.global[variable.TiDBReplicaRead]
do.sysVarCache.RUnlock()

if !strings.EqualFold(replicaRead, "closest-adaptive") {
logutil.BgLogger().Debug("closest replica read is not enabled, skip check!", zap.String("mode", replicaRead))
return nil
}

serverInfo, err := infosync.GetServerInfo()
if err != nil {
return err
}
zone := ""
for k, v := range serverInfo.Labels {
if k == placement.DCLabelKey && v != "" {
zone = v
break
}
}
if zone == "" {
logutil.BgLogger().Debug("server contains no 'zone' label, disable closest replica read", zap.Any("labels", serverInfo.Labels))
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return err
}

storeZones := make(map[string]int)
for _, s := range stores {
// skip tumbstone stores or tiflash
if s.NodeState == metapb.NodeState_Removing || s.NodeState == metapb.NodeState_Removed || engine.IsTiFlash(s) {
continue
}
for _, label := range s.Labels {
if label.Key == placement.DCLabelKey && label.Value != "" {
storeZones[label.Value] = 0
break
}
}
}

// no stores in this AZ
if _, ok := storeZones[zone]; !ok {
variable.SetEnableAdaptiveReplicaRead(false)
return nil
}

servers, err := infosync.GetAllServerInfo(ctx)
if err != nil {
return err
}
svrIdsInThisZone := make([]string, 0)
for _, s := range servers {
if v, ok := s.Labels[placement.DCLabelKey]; ok && v != "" {
if _, ok := storeZones[v]; ok {
storeZones[v] += 1
if v == zone {
svrIdsInThisZone = append(svrIdsInThisZone, s.ID)
}
}
}
}
enabledCount := math.MaxInt
for _, count := range storeZones {
if count < enabledCount {
enabledCount = count
}
}
// sort tidb in the same AZ by ID and disable the tidb with bigger ID
// because ID is unchangeable, so this is a simple and stable algorithm to select
// some instances across all tidb servers.
if enabledCount < len(svrIdsInThisZone) {
sort.Slice(svrIdsInThisZone, func(i, j int) bool {
return strings.Compare(svrIdsInThisZone[i], svrIdsInThisZone[j]) < 0
})
}
enabled := true
for _, s := range svrIdsInThisZone[enabledCount:] {
if s == serverInfo.ID {
enabled = false
break
}
}

if variable.SetEnableAdaptiveReplicaRead(enabled) {
logutil.BgLogger().Info("tidb server adaptive closest replica read is changed", zap.Bool("enable", enabled))
}
return nil
}

>>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409))
type sessionPool struct {
resources chan pools.Resource
factory pools.Factory
Expand Down
181 changes: 0 additions & 181 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,184 +215,3 @@ func (mebd *mockEtcdBackend) TLSConfig() *tls.Config { return nil }
func (mebd *mockEtcdBackend) StartGCWorker() error {
panic("not implemented")
}
<<<<<<< HEAD
=======

func TestClosestReplicaReadChecker(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)

ddlLease := 80 * time.Millisecond
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory)
defer func() {
dom.Close()
require.Nil(t, store.Close())
}()
dom.sysVarCache.Lock()
dom.sysVarCache.global = map[string]string{
variable.TiDBReplicaRead: "closest-adaptive",
}
dom.sysVarCache.Unlock()

makeFailpointRes := func(v interface{}) string {
bytes, err := json.Marshal(v)
require.NoError(t, err)
return fmt.Sprintf("return(`%s`)", string(bytes))
}

mockedAllServerInfos := map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
}

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos["s2"])))

stores := []*metapb.Store{
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone1",
},
},
},
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone2",
},
},
},
{
Labels: []*metapb.StoreLabel{
{
Key: "zone",
Value: "zone3",
},
},
},
}

enabled := variable.IsAdaptiveReplicaReadEnabled()

ctx := context.Background()
pdClient := &mockInfoPdClient{}

// check error
pdClient.err = errors.New("mock error")
err = dom.checkReplicaRead(ctx, pdClient)
require.Error(t, err)
require.Equal(t, enabled, variable.IsAdaptiveReplicaReadEnabled())

// labels matches, should be enabled
pdClient.err = nil
pdClient.stores = stores[:2]
variable.SetEnableAdaptiveReplicaRead(false)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.True(t, variable.IsAdaptiveReplicaReadEnabled())

// labels don't match, should disable the flag
for _, i := range []int{0, 1, 3} {
pdClient.stores = stores[:i]
variable.SetEnableAdaptiveReplicaRead(true)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.False(t, variable.IsAdaptiveReplicaReadEnabled())
}

// partial matches
mockedAllServerInfos = map[string]*infosync.ServerInfo{
"s1": {
ID: "s1",
Labels: map[string]string{
"zone": "zone1",
},
},
"s2": {
ID: "s2",
Labels: map[string]string{
"zone": "zone2",
},
},
"s22": {
ID: "s22",
Labels: map[string]string{
"zone": "zone2",
},
},
"s3": {
ID: "s3",
Labels: map[string]string{
"zone": "zone3",
},
},
"s4": {
ID: "s4",
Labels: map[string]string{
"zone": "zone4",
},
},
}
pdClient.stores = stores
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo", makeFailpointRes(mockedAllServerInfos)))
cases := []struct {
id string
matches bool
}{
{
id: "s1",
matches: true,
},
{
id: "s2",
matches: true,
},
{
id: "s22",
matches: false,
},
{
id: "s3",
matches: true,
},
{
id: "s4",
matches: false,
},
}
for _, c := range cases {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo", makeFailpointRes(mockedAllServerInfos[c.id])))
variable.SetEnableAdaptiveReplicaRead(!c.matches)
err = dom.checkReplicaRead(ctx, pdClient)
require.Nil(t, err)
require.Equal(t, c.matches, variable.IsAdaptiveReplicaReadEnabled())
}

variable.SetEnableAdaptiveReplicaRead(true)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetAllServerInfo"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/mockGetServerInfo"))
}

type mockInfoPdClient struct {
pd.Client
stores []*metapb.Store
err error
}

func (c *mockInfoPdClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return c.stores, c.err
}
>>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409))
18 changes: 2 additions & 16 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ import (
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/syncutil"
"go.uber.org/zap"
)

type domainMap struct {
mu syncutil.Mutex
mu sync.Mutex
domains map[string]*domain.Domain
}

Expand Down Expand Up @@ -80,21 +79,8 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
zap.Stringer("index usage sync lease", idxUsageSyncLease))
factory := createSessionFunc(store)
sysFactory := createSessionWithDomainFunc(store)
<<<<<<< HEAD
onClose := func() {
dm.Delete(store)
}
d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose)
err1 = d.Init(ddlLease, sysFactory)
=======
d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory)

var ddlInjector func(ddl.DDL) *schematracker.Checker
if injector, ok := store.(schematracker.StorageDDLInjector); ok {
ddlInjector = injector.Injector
}
err1 = d.Init(ddlLease, sysFactory, ddlInjector)
>>>>>>> 408a46654d (session: fix deadlock when init domain failed (#40409))
err1 = d.Init(ddlLease, sysFactory)
if err1 != nil {
// If we don't clean it, there are some dirty data when retrying the function of Init.
d.Close()
Expand Down

0 comments on commit 8f7d8cd

Please sign in to comment.