Skip to content

Commit

Permalink
store: fix potential panic in GC worker (#14403) (#14439)
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Jan 16, 2020
1 parent 1c279e9 commit 50963e7
Showing 1 changed file with 13 additions and 22 deletions.
35 changes: 13 additions & 22 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/terror"
"github.com/pingcap/pd/client"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
Expand All @@ -56,8 +56,6 @@ type GCWorker struct {
lastFinish time.Time
cancel context.CancelFunc
done chan error

session session.Session
}

// NewGCWorker creates a GCWorker instance.
Expand Down Expand Up @@ -157,8 +155,6 @@ func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
logutil.Logger(ctx).Info("[gc worker] start",
zap.String("uuid", w.uuid))

w.session = createSession(w.store)

w.tick(ctx) // Immediately tick once to initialize configs.
wg.Done()

Expand Down Expand Up @@ -293,19 +289,20 @@ func (w *GCWorker) prepare() (bool, uint64, error) {
// 4. GC update `tikv_gc_safe_point` value to t2, continue do GC in this round.
// Then the data record that has been dropped between time t1 and t2, will be cleaned by GC, but the user thinks the data after t1 won't be clean by GC.
ctx := context.Background()
_, err := w.session.Execute(ctx, "BEGIN")
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(ctx, "BEGIN")
if err != nil {
return false, 0, errors.Trace(err)
}
doGC, safePoint, err := w.checkPrepare(ctx)
if doGC {
_, err = w.session.Execute(ctx, "COMMIT")
err = se.CommitTxn(ctx)
if err != nil {
return false, 0, errors.Trace(err)
}
} else {
_, err1 := w.session.Execute(ctx, "ROLLBACK")
terror.Log(errors.Trace(err1))
se.RollbackTxn(ctx)
}
return doGC, safePoint, errors.Trace(err)
}
Expand Down Expand Up @@ -1146,7 +1143,6 @@ func (w *GCWorker) checkLeader() (bool, error) {
if err != nil {
return false, errors.Trace(err)
}
w.session = se
leader, err := w.loadValueFromSysTable(gcLeaderUUIDKey)
if err != nil {
_, err1 := se.Execute(ctx, "ROLLBACK")
Expand Down Expand Up @@ -1177,6 +1173,7 @@ func (w *GCWorker) checkLeader() (bool, error) {
}
lease, err := w.loadTime(gcLeaderLeaseKey)
if err != nil {
se.RollbackTxn(ctx)
return false, errors.Trace(err)
}
if lease == nil || lease.Before(time.Now()) {
Expand Down Expand Up @@ -1280,8 +1277,10 @@ func (w *GCWorker) loadDurationWithDefault(key string, def time.Duration) (*time

func (w *GCWorker) loadValueFromSysTable(key string) (string, error) {
ctx := context.Background()
se := createSession(w.store)
defer se.Close()
stmt := fmt.Sprintf(`SELECT HIGH_PRIORITY (variable_value) FROM mysql.tidb WHERE variable_name='%s' FOR UPDATE`, key)
rs, err := w.session.Execute(ctx, stmt)
rs, err := se.Execute(ctx, stmt)
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
Expand Down Expand Up @@ -1310,10 +1309,9 @@ func (w *GCWorker) saveValueToSysTable(key, value string) error {
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`,
key, value, gcVariableComments[key])
if w.session == nil {
return errors.New("[saveValueToSysTable session is nil]")
}
_, err := w.session.Execute(context.Background(), stmt)
se := createSession(w.store)
defer se.Close()
_, err := se.Execute(context.Background(), stmt)
logutil.Logger(context.Background()).Debug("[gc worker] save kv",
zap.String("key", key),
zap.String("value", value),
Expand Down Expand Up @@ -1411,13 +1409,6 @@ func NewMockGCWorker(store tikv.Storage) (*MockGCWorker, error) {
lastFinish: time.Now(),
done: make(chan error),
}
worker.session, err = session.CreateSession(worker.store)
if err != nil {
logutil.Logger(context.Background()).Error("initialize MockGCWorker session fail", zap.Error(err))
return nil, errors.Trace(err)
}
privilege.BindPrivilegeManager(worker.session, nil)
worker.session.GetSessionVars().InRestrictedSQL = true
return &MockGCWorker{worker: worker}, nil
}

Expand Down

0 comments on commit 50963e7

Please sign in to comment.