Skip to content

Commit

Permalink
ddl: make sure put key into ETCD monotonously (#52381)
Browse files Browse the repository at this point in the history
close #47060, close #52335
  • Loading branch information
wjhuang2016 authored Apr 9, 2024
1 parent ab90c77 commit 312b69f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"syncer_test.go",
],
flaky = True,
shard_count = 3,
deps = [
":syncer",
"//pkg/ddl",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64
var path string
if variable.EnableMDL.Load() {
path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID)
err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
} else {
path = s.selfSchemaVerPath
err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver,
Expand Down
42 changes: 42 additions & 0 deletions pkg/ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -182,3 +183,44 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV
require.Equal(t, key, string(kv.Key))
require.Equal(t, val, string(kv.Value))
}

func TestPutKVToEtcdMono(t *testing.T) {
integration.BeforeTestExternal(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
cli := cluster.RandClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3))
require.NoError(t, err)

eg := util.NewErrorGroupWithRecover()
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
// PutKVToEtcdMono should be conflicted and get errors.
require.Error(t, eg.Wait())

eg = util.NewErrorGroupWithRecover()
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
require.NoError(t, eg.Wait())

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)
}
48 changes: 48 additions & 0 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo
return errors.Trace(err)
}

// PutKVToEtcdMono puts key value to etcd monotonously.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
// opts are configures of etcd Operations.
func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
if err = ctx.Err(); err != nil {
return errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout)
var resp *clientv3.GetResponse
resp, err = etcdCli.Get(childCtx, key)
if err != nil {
cancel()
logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
continue
}
prevRevision := int64(0)
if len(resp.Kvs) > 0 {
prevRevision = resp.Kvs[0].ModRevision
}

var txnResp *clientv3.TxnResponse
txnResp, err = etcdCli.Txn(childCtx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)).
Then(clientv3.OpPut(key, val, opts...)).
Commit()

cancel()

if err == nil && txnResp.Succeeded {
return nil
}

if err == nil {
err = errors.New("performing compare-and-swap during PutKVToEtcd failed")
}

logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
}
return errors.Trace(err)
}

// PutKVToEtcd puts key value to etcd.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
Expand Down

0 comments on commit 312b69f

Please sign in to comment.