diff --git a/server/go.mod b/server/go.mod index cb62c4d5dfa..ed8e686c61d 100644 --- a/server/go.mod +++ b/server/go.mod @@ -11,6 +11,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da github.com/golang/protobuf v1.5.3 github.com/google/btree v1.0.1 + github.com/google/go-cmp v0.6.0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 diff --git a/server/go.sum b/server/go.sum index a65af1bc1a4..146f6518a6b 100644 --- a/server/go.sum +++ b/server/go.sum @@ -121,6 +121,7 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index 84dbd8dc0e3..2d5d3eaf4a1 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -289,7 +289,8 @@ func (t *batchTx) commit(stop bool) { type batchTxBuffered struct { batchTx - buf txWriteBuffer + buf txWriteBuffer + pendingDeleteOperations int } func newBatchTxBuffered(backend *backend) *batchTxBuffered { @@ -310,7 +311,27 @@ func (t *batchTxBuffered) Unlock() { // gofail: var beforeWritebackBuf struct{} t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() - if t.pending >= t.backend.batchLimit { + // We commit the transaction when the number of pending operations + // reaches the configured limit(batchLimit) to prevent it from + // becoming excessively large. + // + // But we also need to commit the transaction immediately if there + // is any pending deleting operation, otherwise etcd might run into + // a situation that it haven't finished committing the data into backend + // storage (note: etcd periodically commits the bbolt transactions + // instead of on each request) when it applies next request. Accordingly, + // etcd may still read the stale data from bbolt when processing next + // request. So it breaks the linearizability. + // + // Note we don't need to commit the transaction for put requests if + // it doesn't exceed the batch limit, because there is a buffer on top + // of the bbolt. Each time when etcd reads data from backend storage, + // it will read data from both bbolt and the buffer. But there is no + // such a buffer for delete requests. + // + // Please also refer to + // https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158 + if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 { t.commit(false) } } @@ -353,6 +374,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) { } t.batchTx.commit(stop) + t.pendingDeleteOperations = 0 if !stop { t.backend.readTx.tx = t.backend.begin(false) @@ -368,3 +390,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) t.batchTx.UnsafeSeqPut(bucket, key, value) t.buf.putSeq(bucket, key, value) } + +func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) { + t.batchTx.UnsafeDelete(bucketType, key) + t.pendingDeleteOperations++ +} + +func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) { + t.batchTx.UnsafeDeleteBucket(bucket) + t.pendingDeleteOperations++ +} diff --git a/server/mvcc/backend/batch_tx_test.go b/server/mvcc/backend/batch_tx_test.go index 75b377fed8e..f0e224fb49e 100644 --- a/server/mvcc/backend/batch_tx_test.go +++ b/server/mvcc/backend/batch_tx_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" bolt "go.etcd.io/bbolt" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" @@ -205,3 +206,89 @@ func TestBatchTxBatchLimitCommit(t *testing.T) { return nil }) } + +func TestRangeAfterDeleteBucketMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + tx.Commit() + + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) + + tx.Lock() + tx.UnsafeDeleteBucket(buckets.Test) + tx.Unlock() + + checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) +} + +func TestRangeAfterDeleteMatch(t *testing.T) { + b, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + defer betesting.Close(t, b) + + tx := b.BatchTx() + + tx.Lock() + tx.UnsafeCreateBucket(buckets.Test) + tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar")) + tx.Unlock() + tx.Commit() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")}) + + tx.Lock() + tx.UnsafeDelete(buckets.Test, []byte("foo")) + tx.Unlock() + + checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0) + checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil) +} + +func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) { + tx.Lock() + ks1, vs1 := tx.UnsafeRange(buckets.Test, key, endKey, limit) + tx.Unlock() + + rtx.RLock() + ks2, vs2 := rtx.UnsafeRange(buckets.Test, key, endKey, limit) + rtx.RUnlock() + + if diff := cmp.Diff(ks1, ks2); diff != "" { + t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff) + } + if diff := cmp.Diff(vs1, vs2); diff != "" { + t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff) + } +} + +func checkForEach(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, expectedKeys, expectedValues [][]byte) { + tx.Lock() + checkUnsafeForEach(t, tx, expectedKeys, expectedValues) + tx.Unlock() + + rtx.RLock() + checkUnsafeForEach(t, rtx, expectedKeys, expectedValues) + rtx.RUnlock() +} + +func checkUnsafeForEach(t *testing.T, tx backend.ReadTx, expectedKeys, expectedValues [][]byte) { + var ks, vs [][]byte + tx.UnsafeForEach(buckets.Test, func(k, v []byte) error { + ks = append(ks, k) + vs = append(vs, v) + return nil + }) + + if diff := cmp.Diff(ks, expectedKeys); diff != "" { + t.Errorf("keys on transaction doesn't match expected, diff: %s", diff) + } + if diff := cmp.Diff(vs, expectedValues); diff != "" { + t.Errorf("values on transaction doesn't match expected, diff: %s", diff) + } +} diff --git a/tests/integration/clientv3/user_test.go b/tests/integration/clientv3/user_test.go index 6d82227bbe5..2d81b61eb15 100644 --- a/tests/integration/clientv3/user_test.go +++ b/tests/integration/clientv3/user_test.go @@ -19,8 +19,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/integration" "google.golang.org/grpc" ) @@ -54,6 +55,56 @@ func TestUserError(t *testing.T) { } } +func TestAddUserAfterDelete(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + authapi := clus.RandClient() + authSetupRoot(t, authapi.Auth) + cfg := clientv3.Config{ + Endpoints: authapi.Endpoints(), + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + cfg.Username, cfg.Password = "root", "123" + authed, err := integration.NewClient(t, cfg) + require.NoError(t, err) + defer authed.Close() + + // add user + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authapi.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // delete user + _, err = authed.UserDelete(context.TODO(), "foo") + require.NoError(t, err) + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + // add user back + _, err = authed.UserAdd(context.TODO(), "foo", "bar") + require.NoError(t, err) + _, err = authed.Authenticate(context.TODO(), "foo", "bar") + require.NoError(t, err) + // change password + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar2") + require.NoError(t, err) + _, err = authed.UserChangePassword(context.TODO(), "foo", "bar1") + require.NoError(t, err) + + if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil { + t.Errorf("expect Authenticate error for old password") + } + _, err = authed.Authenticate(context.TODO(), "foo", "bar1") + require.NoError(t, err) +} + func TestUserErrorAuth(t *testing.T) { integration.BeforeTest(t)