Skip to content

Commit 14f0c2f

Browse files
author
Anthony Romano
committed
integration: add corruption test
1 parent 115ae3a commit 14f0c2f

File tree

2 files changed

+207
-110
lines changed

2 files changed

+207
-110
lines changed

integration/v3_alarm_test.go

+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright 2017 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package integration
16+
17+
import (
18+
"fmt"
19+
"os"
20+
"sync"
21+
"testing"
22+
"time"
23+
24+
"github.com/coreos/etcd/etcdserver"
25+
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26+
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27+
"github.com/coreos/etcd/mvcc"
28+
"github.com/coreos/etcd/mvcc/backend"
29+
"github.com/coreos/etcd/pkg/testutil"
30+
"golang.org/x/net/context"
31+
)
32+
33+
// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
34+
func TestV3StorageQuotaApply(t *testing.T) {
35+
testutil.AfterTest(t)
36+
quotasize := int64(16 * os.Getpagesize())
37+
38+
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
39+
defer clus.Terminate(t)
40+
kvc0 := toGRPC(clus.Client(0)).KV
41+
kvc1 := toGRPC(clus.Client(1)).KV
42+
43+
// Set a quota on one node
44+
clus.Members[0].QuotaBackendBytes = quotasize
45+
clus.Members[0].Stop(t)
46+
clus.Members[0].Restart(t)
47+
clus.waitLeader(t, clus.Members)
48+
waitForRestart(t, kvc0)
49+
50+
key := []byte("abc")
51+
52+
// test small put still works
53+
smallbuf := make([]byte, 1024)
54+
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
55+
if serr != nil {
56+
t.Fatal(serr)
57+
}
58+
59+
// test big put
60+
bigbuf := make([]byte, quotasize)
61+
_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
// quorum get should work regardless of whether alarm is raised
67+
_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
72+
// wait until alarm is raised for sure-- poll the alarms
73+
stopc := time.After(5 * time.Second)
74+
for {
75+
req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
76+
resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
77+
if aerr != nil {
78+
t.Fatal(aerr)
79+
}
80+
if len(resp.Alarms) != 0 {
81+
break
82+
}
83+
select {
84+
case <-stopc:
85+
t.Fatalf("timed out waiting for alarm")
86+
case <-time.After(10 * time.Millisecond):
87+
}
88+
}
89+
90+
// small quota machine should reject put
91+
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
92+
t.Fatalf("past-quota instance should reject put")
93+
}
94+
95+
// large quota machine should reject put
96+
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
97+
t.Fatalf("past-quota instance should reject put")
98+
}
99+
100+
// reset large quota node to ensure alarm persisted
101+
clus.Members[1].Stop(t)
102+
clus.Members[1].Restart(t)
103+
clus.waitLeader(t, clus.Members)
104+
105+
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
106+
t.Fatalf("alarmed instance should reject put after reset")
107+
}
108+
}
109+
110+
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
111+
func TestV3AlarmDeactivate(t *testing.T) {
112+
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
113+
defer clus.Terminate(t)
114+
kvc := toGRPC(clus.RandClient()).KV
115+
mt := toGRPC(clus.RandClient()).Maintenance
116+
117+
alarmReq := &pb.AlarmRequest{
118+
MemberID: 123,
119+
Action: pb.AlarmRequest_ACTIVATE,
120+
Alarm: pb.AlarmType_NOSPACE,
121+
}
122+
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
123+
t.Fatal(err)
124+
}
125+
126+
key := []byte("abc")
127+
smallbuf := make([]byte, 512)
128+
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
129+
if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
130+
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
131+
}
132+
133+
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
134+
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
135+
t.Fatal(err)
136+
}
137+
138+
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
139+
t.Fatal(err)
140+
}
141+
}
142+
143+
type fakeConsistentIndex struct{ rev uint64 }
144+
145+
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
146+
147+
func TestV3CorruptAlarm(t *testing.T) {
148+
oldInvl := etcdserver.CorruptCheckInterval
149+
defer func() { etcdserver.CorruptCheckInterval = oldInvl }()
150+
etcdserver.CorruptCheckInterval = 2 * time.Second
151+
152+
defer testutil.AfterTest(t)
153+
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
154+
defer clus.Terminate(t)
155+
156+
var wg sync.WaitGroup
157+
wg.Add(10)
158+
for i := 0; i < 10; i++ {
159+
go func() {
160+
defer wg.Done()
161+
if _, err := clus.Client(i%3).Put(context.TODO(), "k", "v"); err != nil {
162+
t.Fatal(err)
163+
}
164+
}()
165+
}
166+
wg.Wait()
167+
168+
// corrupt member 0
169+
clus.Members[0].Stop(t)
170+
be := backend.NewDefaultBackend(clus.Members[0].DataDir + "/member/snap/db")
171+
s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13})
172+
if r, err := s.Range([]byte("k"), nil, mvcc.RangeOptions{}); err != nil || len(r.KVs) == 0 {
173+
t.Fatalf("key 'k' does not exist (%v)", err)
174+
}
175+
s.Put([]byte("abc"), []byte("def"), 0)
176+
s.Put([]byte("xyz"), []byte("123"), 0)
177+
s.Compact(5)
178+
s.Commit()
179+
s.Close()
180+
be.Close()
181+
182+
clus.Client(1).Put(context.TODO(), "xyz", "321")
183+
clus.Client(1).Put(context.TODO(), "abc", "fed")
184+
185+
// member 0 restarts into split brain
186+
clus.Members[0].Restart(t)
187+
188+
resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
189+
if err0 != nil {
190+
t.Fatal(err0)
191+
}
192+
resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
193+
if err1 != nil {
194+
t.Fatal(err1)
195+
}
196+
197+
if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
198+
t.Fatalf("matching ModRevision values")
199+
}
200+
201+
time.Sleep(3 * time.Second)
202+
203+
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
204+
if perr == nil || !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
205+
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
206+
}
207+
}

integration/v3_grpc_test.go

-110
Original file line numberDiff line numberDiff line change
@@ -1277,116 +1277,6 @@ func TestV3StorageQuotaAPI(t *testing.T) {
12771277
}
12781278
}
12791279

1280-
// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
1281-
func TestV3StorageQuotaApply(t *testing.T) {
1282-
testutil.AfterTest(t)
1283-
quotasize := int64(16 * os.Getpagesize())
1284-
1285-
clus := NewClusterV3(t, &ClusterConfig{Size: 2})
1286-
defer clus.Terminate(t)
1287-
kvc0 := toGRPC(clus.Client(0)).KV
1288-
kvc1 := toGRPC(clus.Client(1)).KV
1289-
1290-
// Set a quota on one node
1291-
clus.Members[0].QuotaBackendBytes = quotasize
1292-
clus.Members[0].Stop(t)
1293-
clus.Members[0].Restart(t)
1294-
clus.waitLeader(t, clus.Members)
1295-
waitForRestart(t, kvc0)
1296-
1297-
key := []byte("abc")
1298-
1299-
// test small put still works
1300-
smallbuf := make([]byte, 1024)
1301-
_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
1302-
if serr != nil {
1303-
t.Fatal(serr)
1304-
}
1305-
1306-
// test big put
1307-
bigbuf := make([]byte, quotasize)
1308-
_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
1309-
if err != nil {
1310-
t.Fatal(err)
1311-
}
1312-
1313-
// quorum get should work regardless of whether alarm is raised
1314-
_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
1315-
if err != nil {
1316-
t.Fatal(err)
1317-
}
1318-
1319-
// wait until alarm is raised for sure-- poll the alarms
1320-
stopc := time.After(5 * time.Second)
1321-
for {
1322-
req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
1323-
resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
1324-
if aerr != nil {
1325-
t.Fatal(aerr)
1326-
}
1327-
if len(resp.Alarms) != 0 {
1328-
break
1329-
}
1330-
select {
1331-
case <-stopc:
1332-
t.Fatalf("timed out waiting for alarm")
1333-
case <-time.After(10 * time.Millisecond):
1334-
}
1335-
}
1336-
1337-
// small quota machine should reject put
1338-
if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1339-
t.Fatalf("past-quota instance should reject put")
1340-
}
1341-
1342-
// large quota machine should reject put
1343-
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1344-
t.Fatalf("past-quota instance should reject put")
1345-
}
1346-
1347-
// reset large quota node to ensure alarm persisted
1348-
clus.Members[1].Stop(t)
1349-
clus.Members[1].Restart(t)
1350-
clus.waitLeader(t, clus.Members)
1351-
1352-
if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
1353-
t.Fatalf("alarmed instance should reject put after reset")
1354-
}
1355-
}
1356-
1357-
// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
1358-
func TestV3AlarmDeactivate(t *testing.T) {
1359-
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1360-
defer clus.Terminate(t)
1361-
kvc := toGRPC(clus.RandClient()).KV
1362-
mt := toGRPC(clus.RandClient()).Maintenance
1363-
1364-
alarmReq := &pb.AlarmRequest{
1365-
MemberID: 123,
1366-
Action: pb.AlarmRequest_ACTIVATE,
1367-
Alarm: pb.AlarmType_NOSPACE,
1368-
}
1369-
if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
1370-
t.Fatal(err)
1371-
}
1372-
1373-
key := []byte("abc")
1374-
smallbuf := make([]byte, 512)
1375-
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
1376-
if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
1377-
t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
1378-
}
1379-
1380-
alarmReq.Action = pb.AlarmRequest_DEACTIVATE
1381-
if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
1382-
t.Fatal(err)
1383-
}
1384-
1385-
if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
1386-
t.Fatal(err)
1387-
}
1388-
}
1389-
13901280
func TestV3RangeRequest(t *testing.T) {
13911281
defer testutil.AfterTest(t)
13921282
tests := []struct {

0 commit comments

Comments
 (0)