Skip to content

Commit

Permalink
ddl: fix panic tidb-server doesn't release table lock (#19586) (#20021)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Sep 17, 2020
1 parent 5ca0957 commit 11bae21
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 0 deletions.
45 changes: 45 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -312,6 +314,7 @@ type ddlCtx struct {
lease time.Duration // lease is schema lease.
binlogCli *pumpcli.PumpsClient // binlogCli is used for Binlog.
infoHandle *infoschema.Handle
tableLockCkr util.DeadTableLockChecker

// hook may be modified.
mu struct {
Expand Down Expand Up @@ -375,6 +378,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ctx, cancelFunc := context.WithCancel(ctx)
var manager owner.Manager
var syncer util.SchemaSyncer
var deadLockCkr util.DeadTableLockChecker
if etcdCli == nil {
// The etcdCli is nil if the store is localstore which is only used for testing.
// So we use mockOwnerManager and MockSchemaSyncer.
Expand All @@ -383,6 +387,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
} else {
manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
syncer = util.NewSchemaSyncer(etcdCli, id, manager)
deadLockCkr = util.NewDeadTableLockChecker(etcdCli)
}

ddlCtx := &ddlCtx{
Expand All @@ -394,6 +399,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
schemaSyncer: syncer,
binlogCli: binloginfo.GetPumpsClient(),
infoHandle: infoHandle,
tableLockCkr: deadLockCkr,
}
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
Expand Down Expand Up @@ -491,6 +497,11 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc()
if config.TableLockEnabled() {
d.wg.Add(1)
go d.startCleanDeadTableLock()
}
metrics.DDLCounter.WithLabelValues(metrics.StartCleanWork).Inc()
}
}

Expand Down Expand Up @@ -681,6 +692,40 @@ func (d *ddl) GetHook() Callback {
return d.mu.hook
}

func (d *ddl) startCleanDeadTableLock() {
defer func() {
goutil.Recover(metrics.LabelDDL, "startCleanDeadTableLock", nil, false)
d.wg.Done()
}()

ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if !d.ownerManager.IsOwner() {
continue
}
if d.infoHandle == nil || !d.infoHandle.IsValid() {
continue
}
deadLockTables, err := d.tableLockCkr.GetDeadLockedTables(d.quitCh, d.infoHandle.Get().AllSchemas())
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] get dead table lock failed.", zap.Error(err))
continue
}
for se, tables := range deadLockTables {
err := d.CleanDeadTableLock(tables, se)
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] clean dead table lock failed.", zap.Error(err))
}
}
case <-d.quitCh:
return
}
}
}

// DDL error codes.
const (
codeInvalidWorker terror.ErrCode = 1
Expand Down
27 changes: 27 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3756,6 +3756,33 @@ func (d *ddl) UnlockTables(ctx sessionctx.Context, unlockTables []model.TableLoc
return errors.Trace(err)
}

// CleanDeadTableLock uses to clean dead table locks.
func (d *ddl) CleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model.SessionInfo) error {
if len(unlockTables) == 0 {
return nil
}
arg := &lockTablesArg{
UnlockTables: unlockTables,
SessionInfo: se,
}
job := &model.Job{
SchemaID: unlockTables[0].SchemaID,
TableID: unlockTables[0].TableID,
Type: model.ActionUnlockTable,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{arg},
}

ctx, err := d.sessPool.get()
if err != nil {
return err
}
defer d.sessPool.put(ctx)
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func throwErrIfInMemOrSysDB(ctx sessionctx.Context, dbLowerName string) error {
if util.IsMemOrSysDB(dbLowerName) {
if ctx.GetSessionVars().User != nil {
Expand Down
106 changes: 106 additions & 0 deletions ddl/util/dead_table_lock_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"context"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

const (
defaultRetryCnt = 5
defaultRetryInterval = time.Millisecond * 200
defaultTimeout = time.Second
)

// DeadTableLockChecker uses to check dead table locks.
// If tidb-server panic or killed by others, the table locks hold by the killed tidb-server maybe doesn't released.
type DeadTableLockChecker struct {
etcdCli *clientv3.Client
}

// NewDeadTableLockChecker creates new DeadLockChecker.
func NewDeadTableLockChecker(etcdCli *clientv3.Client) DeadTableLockChecker {
return DeadTableLockChecker{
etcdCli: etcdCli,
}
}

func (d *DeadTableLockChecker) getAliveServers(ctx context.Context) (map[string]struct{}, error) {
var err error
var resp *clientv3.GetResponse
allInfos := make(map[string]struct{})
for i := 0; i < defaultRetryCnt; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
childCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
resp, err = d.etcdCli.Get(childCtx, DDLAllSchemaVersions, clientv3.WithPrefix())
cancel()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] clean dead table lock get alive servers failed.", zap.Error(err))
time.Sleep(defaultRetryInterval)
continue
}
for _, kv := range resp.Kvs {
serverID := strings.TrimPrefix(string(kv.Key), DDLAllSchemaVersions+"/")
allInfos[serverID] = struct{}{}
}
return allInfos, nil
}
return nil, errors.Trace(err)
}

// GetDeadLockedTables gets dead locked tables.
func (d *DeadTableLockChecker) GetDeadLockedTables(quitCh chan struct{}, schemas []*model.DBInfo) (map[model.SessionInfo][]model.TableLockTpInfo, error) {
if d.etcdCli == nil {
return nil, nil
}
aliveServers, err := d.getAliveServers(context.Background())
if err != nil {
return nil, err
}
deadLockTables := make(map[model.SessionInfo][]model.TableLockTpInfo)
for _, schema := range schemas {
select {
case <-quitCh:
return nil, nil
default:
}
for _, tbl := range schema.Tables {
if tbl.Lock == nil {
continue
}
for _, se := range tbl.Lock.Sessions {
if _, ok := aliveServers[se.ServerID]; !ok {
deadLockTables[se] = append(deadLockTables[se], model.TableLockTpInfo{
SchemaID: schema.ID,
TableID: tbl.ID,
Tp: tbl.Lock.Tp,
})
}
}
}
}
return deadLockTables, nil
}
30 changes: 30 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"github.com/pingcap/tidb/metrics"
"io/ioutil"
"net/http"
"os"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -92,6 +94,34 @@ func WithRecovery(exec func(), recoverFn func(r interface{})) {
exec()
}

// Recover includes operations such as recovering, clearing,and printing information.
// It will dump current goroutine stack into log if catch any recover result.
// metricsLabel: The label of PanicCounter metrics.
// funcInfo: Some information for the panic function.
// recoverFn: Handler will be called after recover and before dump stack, passing `nil` means noop.
// quit: If this value is true, the current program exits after recovery.
func Recover(metricsLabel, funcInfo string, recoverFn func(), quit bool) {
r := recover()
if r == nil {
return
}

if recoverFn != nil {
recoverFn()
}
logutil.Logger(context.Background()).Error("panic in the recoverable goroutine",
zap.String("label", metricsLabel),
zap.String("funcInfo", funcInfo),
zap.Reflect("r", r),
zap.String("stack", string(GetStack())))
metrics.PanicCounter.WithLabelValues(metricsLabel).Inc()
if quit {
// Wait for metrics to be pushed.
time.Sleep(time.Second * 15)
os.Exit(1)
}
}

// CompatibleParseGCTime parses a string with `GCTimeFormat` and returns a time.Time. If `value` can't be parsed as that
// format, truncate to last space and try again. This function is only useful when loading times that saved by
// gc_worker. We have changed the format that gc_worker saves time (removed the last field), but when loading times it
Expand Down

0 comments on commit 11bae21

Please sign in to comment.