Skip to content

Commit bfa97ff

Browse files
committed
[FAB-5661]: Add UT to coordinator of state
Commit add a test case to check the interaction of state transfer with coordinator interface and also to simulate real replication of missing blocks between two peers. Additional logic which takes care of marshalling and unmarshallling of private collection extracted from state transfer. Change-Id: I484cf39250fdf5ebb04aedb3f4db605efcb3d7a3 Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent 8527376 commit bfa97ff

File tree

5 files changed

+244
-57
lines changed

5 files changed

+244
-57
lines changed

gossip/state/coordinator.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,61 @@ package state
99
import (
1010
"fmt"
1111

12+
"github.com/golang/protobuf/proto"
1213
"github.com/hyperledger/fabric/core/committer"
1314
"github.com/hyperledger/fabric/protos/common"
1415
"github.com/hyperledger/fabric/protos/ledger/rwset"
16+
"github.com/pkg/errors"
1517
)
1618

1719
// PvtData a placeholder to represent private data
1820
type PvtData struct {
1921
Payload *rwset.TxPvtReadWriteSet
2022
}
2123

24+
// PvtDataCollections data type to encapsulate collections
25+
// of private data
26+
type PvtDataCollections []*PvtData
27+
28+
// Marshal encodes private collection into bytes array
29+
func (pvt PvtDataCollections) Marshal() ([][]byte, error) {
30+
pvtDataBytes := make([][]byte, 0)
31+
for index, each := range pvt {
32+
pvtBytes, err := proto.Marshal(each.Payload)
33+
if err != nil {
34+
errMsg := fmt.Sprintf("Could not marshal private rwset index %d, due to %s", index, err)
35+
logger.Errorf(errMsg)
36+
return nil, errors.New(errMsg)
37+
}
38+
pvtDataBytes = append(pvtDataBytes, pvtBytes)
39+
}
40+
return pvtDataBytes, nil
41+
}
42+
43+
// Unmarshal read and unmarshal collection of private data
44+
// from given bytes array
45+
func (pvt PvtDataCollections) Unmarshal(data [][]byte) error {
46+
for _, each := range data {
47+
payload := &rwset.TxPvtReadWriteSet{}
48+
if err := proto.Unmarshal(each, payload); err != nil {
49+
return err
50+
}
51+
pvt = append(pvt, &PvtData{Payload: payload})
52+
}
53+
54+
return nil
55+
}
56+
2257
// Coordinator orchestrates the flow of the new
2358
// blocks arrival and in flight transient data, responsible
2459
// to complete missing parts of transient data for given block.
2560
type Coordinator interface {
2661
// StoreBlock deliver new block with underlined private data
2762
// returns missing transaction ids
28-
StoreBlock(block *common.Block, data ...[]*PvtData) ([]string, error)
63+
StoreBlock(block *common.Block, data ...PvtDataCollections) ([]string, error)
2964

3065
// GetBlockByNum returns block and related to the block private data
31-
GetBlockByNum(seqNum uint64) (*common.Block, []*PvtData, error)
66+
GetBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error)
3267

3368
// Get recent block sequence number
3469
LedgerHeight() (uint64, error)
@@ -46,12 +81,12 @@ func NewCoordinator(committer committer.Committer) Coordinator {
4681
return &coordinator{Committer: committer}
4782
}
4883

49-
func (c *coordinator) StoreBlock(block *common.Block, data ...[]*PvtData) ([]string, error) {
84+
func (c *coordinator) StoreBlock(block *common.Block, data ...PvtDataCollections) ([]string, error) {
5085
// Need to check whenever there are missing private rwset
5186
return nil, c.Committer.Commit(block)
5287
}
5388

54-
func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, []*PvtData, error) {
89+
func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, PvtDataCollections, error) {
5590
blocks := c.Committer.GetBlocks([]uint64{seqNum})
5691
if len(blocks) == 0 {
5792
return nil, nil, fmt.Errorf("Cannot retreive block number %d", seqNum)

gossip/state/mocks/gossip.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,33 @@ type GossipMock struct {
1919
mock.Mock
2020
}
2121

22-
func (*GossipMock) SuspectPeers(s api.PeerSuspector) {
23-
panic("implement me")
24-
}
22+
func (g *GossipMock) SuspectPeers(s api.PeerSuspector) {
23+
g.Called(s)
2524

26-
func (*GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
27-
panic("implement me")
2825
}
2926

30-
func (*GossipMock) Peers() []discovery.NetworkMember {
31-
panic("implement me")
27+
func (g *GossipMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
28+
g.Called(msg, peers)
3229
}
3330

34-
func (*GossipMock) PeersOfChannel(common.ChainID) []discovery.NetworkMember {
35-
return nil
31+
func (g *GossipMock) Peers() []discovery.NetworkMember {
32+
return g.Called().Get(0).([]discovery.NetworkMember)
3633
}
3734

38-
func (*GossipMock) UpdateMetadata(metadata []byte) {
39-
panic("implement me")
35+
func (g *GossipMock) PeersOfChannel(chainID common.ChainID) []discovery.NetworkMember {
36+
args := g.Called(chainID)
37+
return args.Get(0).([]discovery.NetworkMember)
4038
}
4139

42-
func (*GossipMock) UpdateChannelMetadata(metadata []byte, chainID common.ChainID) {
40+
func (g *GossipMock) UpdateMetadata(metadata []byte) {
41+
g.Called(metadata)
42+
}
4343

44+
func (g *GossipMock) UpdateChannelMetadata(metadata []byte, chainID common.ChainID) {
4445
}
4546

46-
func (*GossipMock) Gossip(msg *proto.GossipMessage) {
47-
panic("implement me")
47+
func (g *GossipMock) Gossip(msg *proto.GossipMessage) {
48+
g.Called(msg)
4849
}
4950

5051
func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
@@ -58,5 +59,6 @@ func (g *GossipMock) Accept(acceptor common.MessageAcceptor, passThrough bool) (
5859
func (g *GossipMock) JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID) {
5960
}
6061

61-
func (*GossipMock) Stop() {
62+
func (g *GossipMock) Stop() {
63+
6264
}

gossip/state/mocks/gossip_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/hyperledger/fabric/gossip/api"
1313
"github.com/hyperledger/fabric/gossip/common"
14+
"github.com/hyperledger/fabric/gossip/discovery"
1415
proto "github.com/hyperledger/fabric/protos/gossip"
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/mock"
@@ -38,6 +39,7 @@ func TestGossipMock(t *testing.T) {
3839
assert.Panics(t, func() {
3940
g.Peers()
4041
})
42+
g.On("PeersOfChannel", mock.Anything).Return([]discovery.NetworkMember{})
4143
assert.Empty(t, g.PeersOfChannel(common.ChainID("A")))
4244

4345
assert.Panics(t, func() {

gossip/state/state.go

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/hyperledger/fabric/gossip/util"
2424
"github.com/hyperledger/fabric/protos/common"
2525
proto "github.com/hyperledger/fabric/protos/gossip"
26-
"github.com/hyperledger/fabric/protos/ledger/rwset"
2726
"github.com/op/go-logging"
2827
)
2928

@@ -344,35 +343,30 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
344343

345344
blockBytes, err := pb.Marshal(block)
346345

347-
// Marshal private data
348346
if err != nil {
349347
logger.Errorf("Could not marshal block: %s", err)
350348
continue
351349
}
352350

353-
// TODO: Need to extract orgID of the requester and filter out
354-
// private data entries which doesn't belongs to collections
355-
// allowed for sender organization based on policies
356-
pvtDataBytes := make([][]byte, 0)
357-
err = nil
358-
for index, each := range pvtData {
359-
pvtBytes, err := pb.Marshal(each.Payload)
351+
var pvtBytes [][]byte
352+
if pvtData != nil {
353+
// TODO: Need to extract orgID of the requester and filter out
354+
// private data entries which doesn't belongs to collections
355+
// allowed for sender organization based on policies
356+
357+
// Marshal private data
358+
pvtBytes, err = pvtData.Marshal()
360359
if err != nil {
361-
logger.Errorf("Could not marshal private rwset index %d, due to %s", index, err)
362-
break
360+
logger.Errorf("Failed to marshal private rwset for block %d due to %s", seqNum, err)
361+
continue
363362
}
364-
pvtDataBytes = append(pvtDataBytes, pvtBytes)
365-
}
366-
if err != nil {
367-
logger.Errorf("Failed to marshal private rwset for block %d due to %s", seqNum, err)
368-
continue
369363
}
370364

371365
// Appending result to the response
372366
response.Payloads = append(response.Payloads, &proto.Payload{
373367
SeqNum: seqNum,
374368
Data: blockBytes,
375-
PrivateData: pvtDataBytes,
369+
PrivateData: pvtBytes,
376370
})
377371
}
378372
// Sending back response with missing blocks
@@ -470,15 +464,8 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
470464

471465
// Read all private data into slice
472466
pvt := make([]*PvtData, 0)
473-
var err error
474-
for _, each := range payload.PrivateData {
475-
payload := &rwset.TxPvtReadWriteSet{}
476-
if err = pb.Unmarshal(each, payload); err != nil {
477-
break
478-
}
479-
pvt = append(pvt, &PvtData{Payload: payload})
480-
}
481-
467+
var p PvtDataCollections
468+
err := p.Unmarshal(payload.PrivateData)
482469
if err != nil {
483470
logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%s)...dropping block", payload.SeqNum, err)
484471
continue

0 commit comments

Comments
 (0)