Skip to content

Commit 5e359fe

Browse files
guogeryacovm
authored andcommitted
[FAB-11919] Towards etcdraft snapshotting 4/4
Add integration test for snapshotting. Change-Id: Ia5f32194dd37b8136670378178ae8a5555b79335 Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
1 parent 5ca4428 commit 5e359fe

File tree

3 files changed

+141
-27
lines changed

3 files changed

+141
-27
lines changed

integration/e2e/cft_test.go

Lines changed: 136 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@ package e2e
99
import (
1010
"io/ioutil"
1111
"os"
12+
"path"
1213
"path/filepath"
14+
"strconv"
1315
"syscall"
1416

1517
docker "github.com/fsouza/go-dockerclient"
1618
"github.com/hyperledger/fabric/integration/nwo"
1719
"github.com/hyperledger/fabric/integration/nwo/commands"
1820
. "github.com/onsi/ginkgo"
1921
. "github.com/onsi/gomega"
22+
"github.com/onsi/gomega/gbytes"
2023
"github.com/onsi/gomega/gexec"
2124
"github.com/tedsuo/ifrit"
2225
"github.com/tedsuo/ifrit/grouper"
@@ -28,8 +31,9 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
2831
client *docker.Client
2932
network *nwo.Network
3033
chaincode nwo.Chaincode
34+
peer *nwo.Peer
3135

32-
networkProc, o1Proc ifrit.Process
36+
peerProc, ordererProc, o1Proc ifrit.Process
3337
)
3438

3539
BeforeEach(func() {
@@ -54,22 +58,40 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
5458
o1Proc.Signal(syscall.SIGTERM)
5559
Eventually(o1Proc.Wait(), network.EventuallyTimeout).Should(Receive())
5660
}
57-
if networkProc != nil {
58-
networkProc.Signal(syscall.SIGTERM)
59-
Eventually(networkProc.Wait(), network.EventuallyTimeout).Should(Receive())
61+
if ordererProc != nil {
62+
ordererProc.Signal(syscall.SIGTERM)
63+
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())
64+
}
65+
if peerProc != nil {
66+
peerProc.Signal(syscall.SIGTERM)
67+
Eventually(peerProc.Wait(), network.EventuallyTimeout).Should(Receive())
6068
}
6169
if network != nil {
6270
network.Cleanup()
6371
}
6472
os.RemoveAll(testDir)
6573
})
6674

75+
fetchLatestBlock := func(targetOrderer *nwo.Orderer, blockFile string) {
76+
c := commands.ChannelFetch{
77+
ChannelID: "testchannel",
78+
Block: "newest",
79+
OutputFile: blockFile,
80+
}
81+
if targetOrderer != nil {
82+
c.Orderer = network.OrdererAddress(targetOrderer, nwo.ListenPort)
83+
}
84+
sess, err := network.PeerAdminSession(peer, c)
85+
Expect(err).NotTo(HaveOccurred())
86+
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
87+
}
88+
6789
When("orderer stops and restarts", func() {
6890
It("keeps network up and running", func() {
6991
network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, 33000, components)
7092

7193
o1, o2, o3 := network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3")
72-
p := network.Peer("Org1", "peer1")
94+
peer = network.Peer("Org1", "peer1")
7395
blockFile1 := filepath.Join(testDir, "newest_orderer1_block.pb")
7496
blockFile2 := filepath.Join(testDir, "newest_orderer2_block.pb")
7597

@@ -84,17 +106,12 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
84106
ordererGroup := grouper.NewParallel(syscall.SIGTERM, orderers)
85107
peerGroup := network.PeerGroupRunner()
86108

87-
networkRunner := grouper.NewOrdered(
88-
syscall.SIGTERM,
89-
grouper.Members{
90-
{Name: "orderers", Runner: ordererGroup},
91-
{Name: "peers", Runner: peerGroup},
92-
},
93-
)
94109
o1Proc = ifrit.Invoke(o1Runner)
95-
networkProc = ifrit.Invoke(networkRunner)
110+
ordererProc = ifrit.Invoke(ordererGroup)
96111
Eventually(o1Proc.Ready()).Should(BeClosed())
97-
Eventually(networkProc.Ready()).Should(BeClosed())
112+
Eventually(ordererProc.Ready()).Should(BeClosed())
113+
peerProc = ifrit.Invoke(peerGroup)
114+
Eventually(peerProc.Ready()).Should(BeClosed())
98115

99116
By("performing operation with orderer1")
100117
network.CreateAndJoinChannel(o1, "testchannel")
@@ -112,21 +129,85 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
112129
Eventually(o1Proc.Ready()).Should(BeClosed())
113130

114131
By("executing transaction with restarted orderer")
115-
RunQueryInvokeQuery(network, o1, p, "testchannel")
116-
117-
fetchLatestBlock := func(targetOrderer *nwo.Orderer, blockFile string) {
118-
c := commands.ChannelFetch{
119-
ChannelID: "testchannel",
120-
Block: "newest",
121-
OutputFile: blockFile,
122-
}
123-
if targetOrderer != nil {
124-
c.Orderer = network.OrdererAddress(targetOrderer, nwo.ListenPort)
125-
}
126-
sess, err := network.PeerAdminSession(p, c)
132+
RunQueryInvokeQuery(network, o1, peer, "testchannel")
133+
134+
fetchLatestBlock(o1, blockFile1)
135+
fetchLatestBlock(o2, blockFile2)
136+
b1 := nwo.UnmarshalBlockFromFile(blockFile1)
137+
b2 := nwo.UnmarshalBlockFromFile(blockFile2)
138+
Expect(b1.Header.Bytes()).To(Equal(b2.Header.Bytes()))
139+
})
140+
})
141+
142+
When("an orderer is behind the latest snapshot on leader", func() {
143+
It("catches up using the block stored in snapshot", func() {
144+
network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, 33000, components)
145+
146+
o1, o2, o3 := network.Orderer("orderer1"), network.Orderer("orderer2"), network.Orderer("orderer3")
147+
148+
peer = network.Peer("Org1", "peer1")
149+
blockFile1 := filepath.Join(testDir, "newest_orderer1_block.pb")
150+
blockFile2 := filepath.Join(testDir, "newest_orderer2_block.pb")
151+
152+
network.GenerateConfigTree()
153+
network.Bootstrap()
154+
155+
orderers := grouper.Members{
156+
{Name: o2.ID(), Runner: network.OrdererRunner(o2)},
157+
{Name: o3.ID(), Runner: network.OrdererRunner(o3)},
158+
}
159+
ordererGroup := grouper.NewParallel(syscall.SIGTERM, orderers)
160+
peerGroup := network.PeerGroupRunner()
161+
162+
ordererProc = ifrit.Invoke(ordererGroup)
163+
Eventually(ordererProc.Ready()).Should(BeClosed())
164+
peerProc = ifrit.Invoke(peerGroup)
165+
Eventually(peerProc.Ready()).Should(BeClosed())
166+
167+
network.CreateAndJoinChannel(o2, "testchannel")
168+
nwo.DeployChaincode(network, "testchannel", o2, chaincode)
169+
170+
for i := 1; i <= 6; i++ {
171+
RunInvoke(network, o2, peer, "testchannel")
172+
RunQuery(network, o2, peer, "testchannel", 100-i*10)
173+
}
174+
175+
o2SnapDir := path.Join(network.RootDir, "orderers", o2.ID(), "etcdraft", "snapshot")
176+
Eventually(func() int {
177+
files, err := ioutil.ReadDir(path.Join(o2SnapDir, "testchannel"))
127178
Expect(err).NotTo(HaveOccurred())
128-
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
179+
return len(files)
180+
}).Should(Equal(1))
181+
182+
ordererProc.Signal(syscall.SIGKILL)
183+
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())
184+
185+
o1Runner := network.OrdererRunner(o1)
186+
orderers = grouper.Members{
187+
{Name: o2.ID(), Runner: network.OrdererRunner(o2)},
188+
{Name: o3.ID(), Runner: network.OrdererRunner(o3)},
129189
}
190+
ordererGroup = grouper.NewParallel(syscall.SIGTERM, orderers)
191+
ordererProc = ifrit.Invoke(ordererGroup)
192+
Eventually(ordererProc.Ready()).Should(BeClosed())
193+
194+
o1Proc = ifrit.Invoke(o1Runner)
195+
Eventually(o1Proc.Ready()).Should(BeClosed())
196+
197+
o1SnapDir := path.Join(network.RootDir, "orderers", o1.ID(), "etcdraft", "snapshot")
198+
Eventually(func() int {
199+
files, err := ioutil.ReadDir(o1SnapDir)
200+
Expect(err).NotTo(HaveOccurred())
201+
return len(files)
202+
}, network.EventuallyTimeout).Should(Equal(2))
203+
Eventually(func() int {
204+
files, err := ioutil.ReadDir(path.Join(o1SnapDir, "testchannel"))
205+
Expect(err).NotTo(HaveOccurred())
206+
return len(files)
207+
}, network.EventuallyTimeout).Should(Equal(1))
208+
209+
RunInvoke(network, o1, peer, "testchannel")
210+
RunQuery(network, o1, peer, "testchannel", 30)
130211

131212
fetchLatestBlock(o1, blockFile1)
132213
fetchLatestBlock(o2, blockFile2)
@@ -136,3 +217,31 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
136217
})
137218
})
138219
})
220+
221+
func RunInvoke(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channel string) {
222+
sess, err := n.PeerUserSession(peer, "User1", commands.ChaincodeInvoke{
223+
ChannelID: channel,
224+
Orderer: n.OrdererAddress(orderer, nwo.ListenPort),
225+
Name: "mycc",
226+
Ctor: `{"Args":["invoke","a","b","10"]}`,
227+
PeerAddresses: []string{
228+
n.PeerAddress(n.Peer("Org1", "peer0"), nwo.ListenPort),
229+
n.PeerAddress(n.Peer("Org2", "peer1"), nwo.ListenPort),
230+
},
231+
WaitForEvent: true,
232+
})
233+
Expect(err).NotTo(HaveOccurred())
234+
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
235+
Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful. result: status:200"))
236+
}
237+
238+
func RunQuery(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channel string, expect int) {
239+
sess, err := n.PeerUserSession(peer, "User1", commands.ChaincodeQuery{
240+
ChannelID: channel,
241+
Name: "mycc",
242+
Ctor: `{"Args":["query","a"]}`,
243+
})
244+
Expect(err).NotTo(HaveOccurred())
245+
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
246+
Expect(sess).To(gbytes.Say(strconv.Itoa(expect)))
247+
}

integration/nwo/configtx_template.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ Profiles:{{ range .Profiles }}
8989
{{- end }}
9090
{{- if eq $w.Consensus.Type "etcdraft" }}
9191
EtcdRaft:
92+
Options:
93+
SnapshotInterval: 5
9294
Consenters:{{ range .Orderers }}{{ with $w.Orderer . }}
9395
- Host: 127.0.0.1
9496
Port: {{ $w.OrdererPort . "Listen" }}

integration/nwo/orderer_template.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ General:
2525
ClientPrivateKey: {{ $w.OrdererLocalTLSDir Orderer }}/server.key
2626
DialTimeout: 5s
2727
RPCTimeout: 7s
28+
ReplicationBufferSize: 20971520
29+
ReplicationPullTimeout: 5s
30+
ReplicationRetryTimeout: 5s
2831
RootCAs:
2932
- {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt
3033
Keepalive:

0 commit comments

Comments
 (0)