Skip to content

Commit 29caeb5

Browse files
committed
[FAB-5083] Dissemination to internal/external peers
This commit makes the distributor to distribute private data to peers both internal and external of the organization. The minimum and maximum number of peers is determined by core.yaml and in the future would be set via policies. Change-Id: If3353f82ed333e63748b4a532d702f0ac6497389 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent f2445fc commit 29caeb5

File tree

11 files changed

+288
-67
lines changed

11 files changed

+288
-67
lines changed

core/common/privdata/nopcollection.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package privdata
88

99
import (
1010
"github.com/hyperledger/fabric/protos/common"
11+
"github.com/spf13/viper"
1112
)
1213

1314
// NopCollection implements an allow-all collection which all orgs are a member of
@@ -27,11 +28,11 @@ func (nc *NopCollection) GetMemberOrgs() []string {
2728
}
2829

2930
func (nc *NopCollection) RequiredExternalPeerCount() int {
30-
return 0
31+
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
3132
}
3233

3334
func (nc *NopCollection) RequiredInternalPeerCount() int {
34-
return 0
35+
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
3536
}
3637

3738
func (nc *NopCollection) GetAccessFilter() Filter {

gossip/api/subchannel.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import "github.com/hyperledger/fabric/gossip/common"
1313
type RoutingFilter func(peerIdentity PeerIdentityType) bool
1414

1515
// CollectionCriteria describes a way of selecting peers from a sub-channel
16-
// given their signatures
17-
type SubChannelSelectionCriteria func(signature PeerSignature) bool
16+
// given their signatures and whether they are from our organization
17+
type SubChannelSelectionCriteria func(signature PeerSignature, isFromOurOrg bool) bool
1818

1919
// RoutingFilterFactory defines an object that given a CollectionCriteria and a channel,
2020
// it can ascertain which peers should be aware of the data related to the

gossip/gossip/channel/channel.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,13 @@ func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCrit
413413
return false
414414
}
415415

416+
sameOrg := bytes.Equal(gc.selfOrg, gc.GetOrgOfPeer(member.PKIid))
417+
416418
return messagePredicate(api.PeerSignature{
417419
Message: msg.Payload,
418420
Signature: msg.Signature,
419421
PeerIdentity: peerIdentity,
420-
})
422+
}, sameOrg)
421423
}
422424
}
423425

gossip/gossip/channel/channel_test.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,23 +1550,28 @@ func TestGossipChannelEligibility(t *testing.T) {
15501550
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
15511551

15521552
// Ensure peers from the channel are returned
1553-
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1553+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
1554+
assert.True(t, sameOrg)
15541555
return true
15551556
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1556-
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1557+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
1558+
assert.False(t, sameOrg)
15571559
return true
15581560
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
15591561
// But not peers which aren't in the channel
1560-
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1562+
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
1563+
assert.False(t, sameOrg)
15611564
return true
15621565
})(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
15631566

15641567
// Ensure the given predicate is considered
1565-
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1568+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
1569+
assert.False(t, sameOrg)
15661570
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
15671571
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
15681572

1569-
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1573+
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
1574+
assert.True(t, sameOrg)
15701575
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
15711576
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
15721577

gossip/gossip/gossip.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package gossip
88

99
import (
1010
"crypto/tls"
11+
"fmt"
1112
"time"
1213

1314
"github.com/hyperledger/fabric/gossip/api"
@@ -82,6 +83,11 @@ type SendCriteria struct {
8283
// Only peers that joined the channel would receive this message
8384
}
8485

86+
// String returns a string representation of this SendCriteria
87+
func (sc SendCriteria) String() string {
88+
return fmt.Sprintf("channel: %s, tout: %v, minAck: %d, maxPeers: %d", sc.Channel, sc.Timeout, sc.MinAck, sc.MaxPeers)
89+
}
90+
8591
// Config is the configuration of the gossip component
8692
type Config struct {
8793
BindPort int // Port we bind to, used only for tests

gossip/privdata/coordinator_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/hyperledger/fabric/protos/common"
2323
proto "github.com/hyperledger/fabric/protos/gossip"
2424
"github.com/hyperledger/fabric/protos/ledger/rwset"
25+
"github.com/spf13/viper"
2526
"github.com/stretchr/testify/assert"
2627
"github.com/stretchr/testify/mock"
2728
)
@@ -271,26 +272,26 @@ type collectionAccessPolicy struct {
271272
n uint64
272273
}
273274

275+
func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int {
276+
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
277+
}
278+
279+
func (cap *collectionAccessPolicy) RequiredExternalPeerCount() int {
280+
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
281+
}
282+
274283
func (cap *collectionAccessPolicy) GetAccessFilter() privdata.Filter {
275284
return func(sd common.SignedData) bool {
276285
that, _ := asn1.Marshal(sd)
277286
this, _ := asn1.Marshal(cap.cs.expectedSignedData)
278287
if hex.EncodeToString(that) != hex.EncodeToString(this) {
279-
panic("Self signed data passed isn't equal to expected")
288+
panic(fmt.Errorf("self signed data passed isn't equal to expected:%v, %v", sd, cap.cs.expectedSignedData))
280289
}
281290
_, exists := cap.cs.policies[*cap]
282291
return exists || cap.cs.acceptsAll
283292
}
284293
}
285294

286-
func (cap *collectionAccessPolicy) RequiredExternalPeerCount() int {
287-
return 0
288-
}
289-
290-
func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int {
291-
return 0
292-
}
293-
294295
func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) {
295296
collection := &util.PvtDataCollections{
296297
&ledger.TxPvtData{

gossip/privdata/distributor.go

Lines changed: 74 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ type PvtDataDistributor interface {
4343

4444
// distributorImpl the implementation of the private data distributor interface
4545
type distributorImpl struct {
46-
chainID string
47-
minAck int
48-
maxPeers int
46+
chainID string
4947
gossipAdapter
5048
}
5149

@@ -55,8 +53,6 @@ func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
5553
return &distributorImpl{
5654
chainID: chainID,
5755
gossipAdapter: gossip,
58-
minAck: viper.GetInt("peer.gossip.pvtData.minAck"),
59-
maxPeers: viper.GetInt("peer.gossip.pvtData.maxPeers"),
6056
}
6157
}
6258

@@ -98,54 +94,64 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.
9894
return nil, errors.Errorf("No collection access policy filter computed for %v", cc)
9995
}
10096

101-
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature) bool {
102-
return colFilter(common.SignedData{
103-
Data: signature.Message,
104-
Signature: signature.Signature,
105-
Identity: []byte(signature.PeerIdentity),
106-
})
107-
})
108-
97+
pvtDataMsg, err := d.createPrivateDataMessage(txID, namespace, collection.CollectionName, collection.Rwset)
10998
if err != nil {
110-
logger.Error("Failed to retrieve peer routing filter for", cc, " due to", err)
111-
return nil, err
99+
return nil, errors.WithStack(err)
112100
}
113101

114-
msg := &proto.GossipMessage{
115-
Channel: []byte(d.chainID),
116-
Nonce: util.RandomUInt64(),
117-
Tag: proto.GossipMessage_CHAN_ONLY,
118-
Content: &proto.GossipMessage_PrivateData{
119-
PrivateData: &proto.PrivateDataMessage{
120-
Payload: &proto.PrivatePayload{
121-
Namespace: namespace,
122-
CollectionName: collectionName,
123-
TxId: txID,
124-
PrivateRwset: collection.Rwset,
125-
},
126-
},
127-
},
128-
}
129-
130-
pvtDataMsg, err := msg.NoopSign()
102+
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
131103
if err != nil {
132-
return nil, err
104+
return nil, errors.WithStack(err)
133105
}
106+
disseminationPlan = append(disseminationPlan, dPlan...)
107+
}
108+
}
109+
return disseminationPlan, nil
110+
}
134111

135-
sc := gossip2.SendCriteria{
136-
Timeout: time.Second,
137-
Channel: gossipCommon.ChainID(d.chainID),
138-
MaxPeers: d.maxPeers,
139-
MinAck: d.minAck,
140-
IsEligible: func(member discovery.NetworkMember) bool {
141-
return routingFilter(member)
142-
},
112+
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *proto.SignedGossipMessage) ([]*dissemination, error) {
113+
var disseminationPlan []*dissemination
114+
for _, expectedToBeInOurOrg := range []bool{true, false} {
115+
var expectedToBeInOurOrg bool = expectedToBeInOurOrg
116+
minAck := colAP.RequiredExternalPeerCount()
117+
if expectedToBeInOurOrg {
118+
minAck = colAP.RequiredInternalPeerCount()
119+
}
120+
121+
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature, isOurOrg bool) bool {
122+
if isOurOrg != expectedToBeInOurOrg {
123+
return false
143124
}
144-
disseminationPlan = append(disseminationPlan, &dissemination{
145-
criteria: sc,
146-
msg: pvtDataMsg,
125+
return colFilter(common.SignedData{
126+
Data: signature.Message,
127+
Signature: signature.Signature,
128+
Identity: []byte(signature.PeerIdentity),
147129
})
130+
})
131+
132+
if err != nil {
133+
logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
134+
return nil, err
135+
}
136+
137+
maxPeers := viper.GetInt("peer.gossip.pvtData.maxExternalPeers")
138+
if expectedToBeInOurOrg {
139+
maxPeers = viper.GetInt("peer.gossip.pvtData.maxInternalPeers")
148140
}
141+
142+
sc := gossip2.SendCriteria{
143+
Timeout: time.Second,
144+
Channel: gossipCommon.ChainID(d.chainID),
145+
MaxPeers: maxPeers,
146+
MinAck: minAck,
147+
IsEligible: func(member discovery.NetworkMember) bool {
148+
return routingFilter(member)
149+
},
150+
}
151+
disseminationPlan = append(disseminationPlan, &dissemination{
152+
criteria: sc,
153+
msg: pvtDataMsg,
154+
})
149155
}
150156
return disseminationPlan, nil
151157
}
@@ -172,3 +178,27 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error
172178
}
173179
return nil
174180
}
181+
182+
func (d *distributorImpl) createPrivateDataMessage(txID, namespace, collectionName string, rwset []byte) (*proto.SignedGossipMessage, error) {
183+
msg := &proto.GossipMessage{
184+
Channel: []byte(d.chainID),
185+
Nonce: util.RandomUInt64(),
186+
Tag: proto.GossipMessage_CHAN_ONLY,
187+
Content: &proto.GossipMessage_PrivateData{
188+
PrivateData: &proto.PrivateDataMessage{
189+
Payload: &proto.PrivatePayload{
190+
Namespace: namespace,
191+
CollectionName: collectionName,
192+
TxId: txID,
193+
PrivateRwset: rwset,
194+
},
195+
},
196+
},
197+
}
198+
199+
pvtDataMsg, err := msg.NoopSign()
200+
if err != nil {
201+
return nil, err
202+
}
203+
return pvtDataMsg, nil
204+
}

0 commit comments

Comments
 (0)