Skip to content

Commit

Permalink
cherry pick pingcap#19586 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
crazycs520 authored and ti-srebot committed Sep 16, 2020
1 parent d23d8bd commit 584bc2e
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 0 deletions.
73 changes: 73 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand All @@ -42,7 +43,11 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
<<<<<<< HEAD
tidbutil "github.com/pingcap/tidb/util"
=======
goutil "github.com/pingcap/tidb/util"
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -312,6 +317,7 @@ type ddlCtx struct {
lease time.Duration // lease is schema lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoHandle *infoschema.Handle
tableLockCkr util.DeadTableLockChecker

// hook may be modified.
mu struct {
Expand Down Expand Up @@ -375,14 +381,25 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ctx, cancelFunc := context.WithCancel(ctx)
var manager owner.Manager
var syncer util.SchemaSyncer
<<<<<<< HEAD
if etcdCli == nil {
=======
var deadLockCkr util.DeadTableLockChecker
if etcdCli := opt.EtcdCli; etcdCli == nil {
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and MockSchemaSyncer.
manager = owner.NewMockManager(id, cancelFunc)
syncer = NewMockSchemaSyncer()
} else {
<<<<<<< HEAD
manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
syncer = util.NewSchemaSyncer(etcdCli, id, manager)
=======
manager = owner.NewOwnerManager(ctx, etcdCli, ddlPrompt, id, DDLOwnerKey)
syncer = util.NewSchemaSyncer(ctx, etcdCli, id, manager)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
}

ddlCtx := &ddlCtx{
Expand All @@ -393,7 +410,12 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ownerManager: manager,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpsClient(),
<<<<<<< HEAD
infoHandle: infoHandle,
=======
infoHandle: opt.InfoHandle,
tableLockCkr: deadLockCkr,
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
}
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
Expand Down Expand Up @@ -481,6 +503,7 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
asyncNotify(worker.ddlJobCh)
}

<<<<<<< HEAD
go tidbutil.WithRecovery(
func() { d.schemaSyncer.StartCleanWork() },
func(r interface{}) {
Expand All @@ -491,6 +514,14 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc()
=======
go d.schemaSyncer.StartCleanWork()
if config.TableLockEnabled() {
d.wg.Add(1)
go d.startCleanDeadTableLock()
}
metrics.DDLCounter.WithLabelValues(metrics.StartCleanWork).Inc()
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
}
}

Expand Down Expand Up @@ -681,6 +712,7 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

<<<<<<< HEAD
// DDL error codes.
const (
codeInvalidWorker terror.ErrCode = 1
Expand Down Expand Up @@ -864,4 +896,45 @@ func init() {
mysql.ErrFKIncompatibleColumns: mysql.ErrFKIncompatibleColumns,
}
terror.ErrClassToMySQLCodes[terror.ClassDDL] = ddlMySQLErrCodes
=======
func (d *ddl) startCleanDeadTableLock() {
defer func() {
goutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false)
d.wg.Done()
}()

ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !d.ownerManager.IsOwner() {
continue
}
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.ctx, d.infoHandle.Get().AllSchemas())
if err != nil {
logutil.BgLogger().Info("[ddl] get dead table lock failed.", zap.Error(err))
continue
}
for se, tables := range deadLockTables {
err := d.CleanDeadTableLock(tables, se)
if err != nil {
logutil.BgLogger().Info("[ddl] clean dead table lock failed.", zap.Error(err))
}
}
case <-d.ctx.Done():
return
}
}
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
TableInfo *model.TableInfo
DropJobID int64
SnapshotTS uint64
CurAutoIncID int64
CurAutoRandID int64
>>>>>>> 8ae5b1c... ddl: fix panic tidb-server doesn't release table lock (#19586)
}
27 changes: 27 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3743,6 +3743,33 @@ func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLoc
return errors.Trace(err)
}

// CleanDeadTableLock uses to clean dead table locks.
func (d *ddl) CleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model.SessionInfo) error {
if len(unlockTables) == 0 {
return nil
}
arg := &lockTablesArg{
UnlockTables: unlockTables,
SessionInfo: se,
}
job := &model.Job{
SchemaID: unlockTables[0].SchemaID,
TableID: unlockTables[0].TableID,
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},
}

ctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(ctx)
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func throwErrIfInMemOrSysDB(ctx sessionctx.Context, dbLowerName string) error {
if util.IsMemOrSysDB(dbLowerName) {
if ctx.GetSessionVars().User != nil {
Expand Down
106 changes: 106 additions & 0 deletions ddl/util/dead_table_lock_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

const (
defaultRetryCnt = 5
defaultRetryInterval = time.Millisecond * 200
defaultTimeout = time.Second
)

// DeadTableLockChecker uses to check dead table locks.
// If tidb-server panic or killed by others, the table locks hold by the killed tidb-server maybe doesn't released.
type DeadTableLockChecker struct {
etcdCli *clientv3.Client
}

// NewDeadTableLockChecker creates new DeadLockChecker.
func NewDeadTableLockChecker(etcdCli *clientv3.Client) DeadTableLockChecker {
return DeadTableLockChecker{
etcdCli: etcdCli,
}
}

func (d *DeadTableLockChecker) getAliveServers(ctx context.Context) (map[string]struct{}, error) {
var err error
var resp *clientv3.GetResponse
allInfos := make(map[string]struct{})
for i := 0; i < defaultRetryCnt; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
childCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
resp, err = d.etcdCli.Get(childCtx, DDLAllSchemaVersions, clientv3.WithPrefix())
cancel()
if err != nil {
logutil.BgLogger().Info("[ddl] clean dead table lock get alive servers failed.", zap.Error(err))
time.Sleep(defaultRetryInterval)
continue
}
for _, kv := range resp.Kvs {
serverID := strings.TrimPrefix(string(kv.Key), DDLAllSchemaVersions+"/")
allInfos[serverID] = struct{}{}
}
return allInfos, nil
}
return nil, errors.Trace(err)
}

// GetDeadLockedTables gets dead locked tables.
func (d *DeadTableLockChecker) GetDeadLockedTables(ctx context.Context, schemas []*model.DBInfo) (map[model.SessionInfo][]model.TableLockTpInfo, error) {
if d.etcdCli == nil {
return nil, nil
}
aliveServers, err := d.getAliveServers(ctx)
if err != nil {
return nil, err
}
deadLockTables := make(map[model.SessionInfo][]model.TableLockTpInfo)
for _, schema := range schemas {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
for _, tbl := range schema.Tables {
if tbl.Lock == nil {
continue
}
for _, se := range tbl.Lock.Sessions {
if _, ok := aliveServers[se.ServerID]; !ok {
deadLockTables[se] = append(deadLockTables[se], model.TableLockTpInfo{
SchemaID: schema.ID,
TableID: tbl.ID,
Tp: tbl.Lock.Tp,
})
}
}
}
}
return deadLockTables, nil
}

0 comments on commit 584bc2e

Please sign in to comment.