Skip to content

Commit 8e1b0ca

Browse files
author
Anthony Romano
committed
integration: add corruption test
1 parent 115ae3a commit 8e1b0ca

File tree

2 files changed

+206
-110
lines changed

2 files changed

+206
-110
lines changed

integration/v3_alarm_test.go

+206
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright 2017 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package integration
16+
17+
import (
18+
"os"
19+
"sync"
20+
"testing"
21+
"time"
22+
23+
"github.com/coreos/etcd/etcdserver"
24+
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
25+
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
26+
"github.com/coreos/etcd/mvcc"
27+
"github.com/coreos/etcd/mvcc/backend"
28+
"github.com/coreos/etcd/pkg/testutil"
29+
"golang.org/x/net/context"
30+
)
31+
32+
// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
33+
func TestV3StorageQuotaApply(t *testing.T) {
34+
testutil.AfterTest(t)
35+
quotasize := int64(16 * os.Getpagesize())
36+
37+
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
38+
defer clus.Terminate(t)
39+
kvc0 := toGRPC(clus.Client(0)).KV
40+
kvc1 := toGRPC(clus.Client(1)).KV
41+
42+
// Set a quota on one node
43+
clus.Members[0].QuotaBackendBytes = quotasize
44+
clus.Members[0].Stop(t)
45+
clus.Members[0].Restart(t)
46+
clus.waitLeader(t, clus.Members)
47+
waitForRestart(t, kvc0)
48+
49+
key := []byte("abc")
50+
51+
// test small put still works
52+
smallbuf := make([]byte, 1024)
53+
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
54+
if serr != nil {
55+
t.Fatal(serr)
56+
}
57+
58+
// test big put
59+
bigbuf := make([]byte, quotasize)
60+
_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
65+
// quorum get should work regardless of whether alarm is raised
66+
_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
71+
// wait until alarm is raised for sure-- poll the alarms
72+
stopc := time.After(5 * time.Second)
73+
for {
74+
req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
75+
resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
76+
if aerr != nil {
77+
t.Fatal(aerr)
78+
}
79+
if len(resp.Alarms) != 0 {
80+
break
81+
}
82+
select {
83+
case <-stopc:
84+
t.Fatalf("timed out waiting for alarm")
85+
case <-time.After(10 * time.Millisecond):
86+
}
87+
}
88+
89+
// small quota machine should reject put
90+
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
91+
t.Fatalf("past-quota instance should reject put")
92+
}
93+
94+
// large quota machine should reject put
95+
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
96+
t.Fatalf("past-quota instance should reject put")
97+
}
98+
99+
// reset large quota node to ensure alarm persisted
100+
clus.Members[1].Stop(t)
101+
clus.Members[1].Restart(t)
102+
clus.waitLeader(t, clus.Members)
103+
104+
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
105+
t.Fatalf("alarmed instance should reject put after reset")
106+
}
107+
}
108+
109+
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
110+
func TestV3AlarmDeactivate(t *testing.T) {
111+
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
112+
defer clus.Terminate(t)
113+
kvc := toGRPC(clus.RandClient()).KV
114+
mt := toGRPC(clus.RandClient()).Maintenance
115+
116+
alarmReq := &pb.AlarmRequest{
117+
MemberID: 123,
118+
Action: pb.AlarmRequest_ACTIVATE,
119+
Alarm: pb.AlarmType_NOSPACE,
120+
}
121+
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
122+
t.Fatal(err)
123+
}
124+
125+
key := []byte("abc")
126+
smallbuf := make([]byte, 512)
127+
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
128+
if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
129+
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
130+
}
131+
132+
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
133+
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
134+
t.Fatal(err)
135+
}
136+
137+
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
138+
t.Fatal(err)
139+
}
140+
}
141+
142+
type fakeConsistentIndex struct{ rev uint64 }
143+
144+
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
145+
146+
func TestV3CorruptAlarm(t *testing.T) {
147+
oldInvl := etcdserver.CorruptCheckInterval
148+
defer func() { etcdserver.CorruptCheckInterval = oldInvl }()
149+
etcdserver.CorruptCheckInterval = 2 * time.Second
150+
151+
defer testutil.AfterTest(t)
152+
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
153+
defer clus.Terminate(t)
154+
155+
var wg sync.WaitGroup
156+
wg.Add(10)
157+
for i := 0; i < 10; i++ {
158+
go func() {
159+
defer wg.Done()
160+
if _, err := clus.Client(i%3).Put(context.TODO(), "k", "v"); err != nil {
161+
t.Fatal(err)
162+
}
163+
}()
164+
}
165+
wg.Wait()
166+
167+
// corrupt member 0
168+
clus.Members[0].Stop(t)
169+
be := backend.NewDefaultBackend(clus.Members[0].DataDir + "/member/snap/db")
170+
s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13})
171+
if r, err := s.Range([]byte("k"), nil, mvcc.RangeOptions{}); err != nil || len(r.KVs) == 0 {
172+
t.Fatalf("key 'k' does not exist (%v)", err)
173+
}
174+
s.Put([]byte("abc"), []byte("def"), 0)
175+
s.Put([]byte("xyz"), []byte("123"), 0)
176+
s.Compact(5)
177+
s.Commit()
178+
s.Close()
179+
be.Close()
180+
181+
clus.Client(1).Put(context.TODO(), "xyz", "321")
182+
clus.Client(1).Put(context.TODO(), "abc", "fed")
183+
184+
// member 0 restarts into split brain
185+
clus.Members[0].Restart(t)
186+
187+
resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
188+
if err0 != nil {
189+
t.Fatal(err0)
190+
}
191+
resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
192+
if err1 != nil {
193+
t.Fatal(err1)
194+
}
195+
196+
if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
197+
t.Fatalf("matching ModRevision values")
198+
}
199+
200+
time.Sleep(3 * time.Second)
201+
202+
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
203+
if perr == nil || !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
204+
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
205+
}
206+
}

integration/v3_grpc_test.go

-110
Original file line numberDiff line numberDiff line change
@@ -1277,116 +1277,6 @@ func TestV3StorageQuotaAPI(t *testing.T) {
12771277
}
12781278
}
12791279

1280-
// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
1281-
func TestV3StorageQuotaApply(t *testing.T) {
1282-
testutil.AfterTest(t)
1283-
quotasize := int64(16 * os.Getpagesize())
1284-
1285-
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
1286-
defer clus.Terminate(t)
1287-
kvc0 := toGRPC(clus.Client(0)).KV
1288-
kvc1 := toGRPC(clus.Client(1)).KV
1289-
1290-
// Set a quota on one node
1291-
clus.Members[0].QuotaBackendBytes = quotasize
1292-
clus.Members[0].Stop(t)
1293-
clus.Members[0].Restart(t)
1294-
clus.waitLeader(t, clus.Members)
1295-
waitForRestart(t, kvc0)
1296-
1297-
key := []byte("abc")
1298-
1299-
// test small put still works
1300-
smallbuf := make([]byte, 1024)
1301-
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
1302-
if serr != nil {
1303-
t.Fatal(serr)
1304-
}
1305-
1306-
// test big put
1307-
bigbuf := make([]byte, quotasize)
1308-
_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
1309-
if err != nil {
1310-
t.Fatal(err)
1311-
}
1312-
1313-
// quorum get should work regardless of whether alarm is raised
1314-
_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
1315-
if err != nil {
1316-
t.Fatal(err)
1317-
}
1318-
1319-
// wait until alarm is raised for sure-- poll the alarms
1320-
stopc := time.After(5 * time.Second)
1321-
for {
1322-
req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
1323-
resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
1324-
if aerr != nil {
1325-
t.Fatal(aerr)
1326-
}
1327-
if len(resp.Alarms) != 0 {
1328-
break
1329-
}
1330-
select {
1331-
case <-stopc:
1332-
t.Fatalf("timed out waiting for alarm")
1333-
case <-time.After(10 * time.Millisecond):
1334-
}
1335-
}
1336-
1337-
// small quota machine should reject put
1338-
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1339-
t.Fatalf("past-quota instance should reject put")
1340-
}
1341-
1342-
// large quota machine should reject put
1343-
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1344-
t.Fatalf("past-quota instance should reject put")
1345-
}
1346-
1347-
// reset large quota node to ensure alarm persisted
1348-
clus.Members[1].Stop(t)
1349-
clus.Members[1].Restart(t)
1350-
clus.waitLeader(t, clus.Members)
1351-
1352-
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1353-
t.Fatalf("alarmed instance should reject put after reset")
1354-
}
1355-
}
1356-
1357-
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
1358-
func TestV3AlarmDeactivate(t *testing.T) {
1359-
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1360-
defer clus.Terminate(t)
1361-
kvc := toGRPC(clus.RandClient()).KV
1362-
mt := toGRPC(clus.RandClient()).Maintenance
1363-
1364-
alarmReq := &pb.AlarmRequest{
1365-
MemberID: 123,
1366-
Action: pb.AlarmRequest_ACTIVATE,
1367-
Alarm: pb.AlarmType_NOSPACE,
1368-
}
1369-
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
1370-
t.Fatal(err)
1371-
}
1372-
1373-
key := []byte("abc")
1374-
smallbuf := make([]byte, 512)
1375-
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
1376-
if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
1377-
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
1378-
}
1379-
1380-
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
1381-
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
1382-
t.Fatal(err)
1383-
}
1384-
1385-
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
1386-
t.Fatal(err)
1387-
}
1388-
}
1389-
13901280
func TestV3RangeRequest(t *testing.T) {
13911281
defer testutil.AfterTest(t)
13921282
tests := []struct {

0 commit comments

Comments
 (0)