Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: make sure put key into ETCD monotonously #52381

Merged
merged 6 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ddl/syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"syncer_test.go",
],
flaky = True,
shard_count = 3,
deps = [
":syncer",
"//pkg/ddl",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *schemaVersionSyncer) UpdateSelfVersion(ctx context.Context, jobID int64
var path string
if variable.EnableMDL.Load() {
path = fmt.Sprintf("%s/%d/%s", util.DDLAllSchemaVersionsByJob, jobID, s.ddlID)
err = util.PutKVToEtcd(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
err = util.PutKVToEtcdMono(ctx, s.etcdCli, keyOpDefaultRetryCnt, path, ver)
} else {
path = s.selfSchemaVerPath
err = util.PutKVToEtcd(ctx, s.etcdCli, putKeyNoRetry, path, ver,
Expand Down
42 changes: 42 additions & 0 deletions pkg/ddl/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -182,3 +183,44 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV
require.Equal(t, key, string(kv.Key))
require.Equal(t, val, string(kv.Value))
}

func TestPutKVToEtcdMono(t *testing.T) {
integration.BeforeTestExternal(t)

cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
cli := cluster.RandClient()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(2))
require.NoError(t, err)

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(3))
require.NoError(t, err)

eg := util.NewErrorGroupWithRecover()
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcdMono(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
// PutKVToEtcdMono should be conflicted and get errors.
require.Error(t, eg.Wait())

eg = util.NewErrorGroupWithRecover()
for i := 0; i < 30; i++ {
eg.Go(func() error {
err := util2.PutKVToEtcd(ctx, cli, 1, "testKey", strconv.Itoa(5))
return err
})
}
require.NoError(t, eg.Wait())

err = util2.PutKVToEtcdMono(ctx, cli, 3, "testKey", strconv.Itoa(1))
require.NoError(t, err)
}
48 changes: 48 additions & 0 deletions pkg/ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,54 @@ func DeleteKeyFromEtcd(key string, etcdCli *clientv3.Client, retryCnt int, timeo
return errors.Trace(err)
}

// PutKVToEtcdMono puts key value to etcd monotonously.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
// opts are configures of etcd Operations.
func PutKVToEtcdMono(ctx context.Context, etcdCli *clientv3.Client, retryCnt int, key, val string,
opts ...clientv3.OpOption) error {
var err error
for i := 0; i < retryCnt; i++ {
if err = ctx.Err(); err != nil {
return errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, KeyOpDefaultTimeout)
var resp *clientv3.GetResponse
resp, err = etcdCli.Get(childCtx, key)
if err != nil {
cancel()
logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
continue
}
prevRevision := int64(0)
if len(resp.Kvs) > 0 {
prevRevision = resp.Kvs[0].ModRevision
}

var txnResp *clientv3.TxnResponse
txnResp, err = etcdCli.Txn(childCtx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", prevRevision)).
Then(clientv3.OpPut(key, val, opts...)).
Commit()

cancel()

if err == nil && txnResp.Succeeded {
return nil
}

if err == nil {
err = errors.New("performing compare-and-swap during PutKVToEtcd failed")
}

logutil.BgLogger().Warn("etcd-cli put kv failed", zap.String("category", "ddl"), zap.String("key", key), zap.String("value", val), zap.Error(err), zap.Int("retryCnt", i))
time.Sleep(KeyOpRetryInterval)
}
return errors.Trace(err)
}

// PutKVToEtcd puts key value to etcd.
// etcdCli is client of etcd.
// retryCnt is retry time when an error occurs.
Expand Down