Skip to content

Commit

Permalink
[FAB-5083] Dissemination to internal/external peers
Browse files Browse the repository at this point in the history
This commit makes the distributor to distribute private data
to peers both internal and external of the organization.

The minimum and maximum number of peers is determined by core.yaml
and in the future would be set via policies.

Change-Id: If3353f82ed333e63748b4a532d702f0ac6497389
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Oct 1, 2017
1 parent f2445fc commit 29caeb5
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 67 deletions.
5 changes: 3 additions & 2 deletions core/common/privdata/nopcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package privdata

import (
"github.com/hyperledger/fabric/protos/common"
"github.com/spf13/viper"
)

// NopCollection implements an allow-all collection which all orgs are a member of
Expand All @@ -27,11 +28,11 @@ func (nc *NopCollection) GetMemberOrgs() []string {
}

func (nc *NopCollection) RequiredExternalPeerCount() int {
return 0
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
}

func (nc *NopCollection) RequiredInternalPeerCount() int {
return 0
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
}

func (nc *NopCollection) GetAccessFilter() Filter {
Expand Down
4 changes: 2 additions & 2 deletions gossip/api/subchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import "github.com/hyperledger/fabric/gossip/common"
type RoutingFilter func(peerIdentity PeerIdentityType) bool

// CollectionCriteria describes a way of selecting peers from a sub-channel
// given their signatures
type SubChannelSelectionCriteria func(signature PeerSignature) bool
// given their signatures and whether they are from our organization
type SubChannelSelectionCriteria func(signature PeerSignature, isFromOurOrg bool) bool

// RoutingFilterFactory defines an object that given a CollectionCriteria and a channel,
// it can ascertain which peers should be aware of the data related to the
Expand Down
4 changes: 3 additions & 1 deletion gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,13 @@ func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCrit
return false
}

sameOrg := bytes.Equal(gc.selfOrg, gc.GetOrgOfPeer(member.PKIid))

return messagePredicate(api.PeerSignature{
Message: msg.Payload,
Signature: msg.Signature,
PeerIdentity: peerIdentity,
})
}, sameOrg)
}
}

Expand Down
15 changes: 10 additions & 5 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,23 +1550,28 @@ func TestGossipChannelEligibility(t *testing.T) {
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure peers from the channel are returned
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.True(t, sameOrg)
return true
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
// But not peers which aren't in the channel
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
return true
})(discovery.NetworkMember{PKIid: pkiIDinOrg3}))

// Ensure the given predicate is considered
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.False(t, sameOrg)
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))

assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature, sameOrg bool) bool {
assert.True(t, sameOrg)
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))

Expand Down
6 changes: 6 additions & 0 deletions gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gossip

import (
"crypto/tls"
"fmt"
"time"

"github.com/hyperledger/fabric/gossip/api"
Expand Down Expand Up @@ -82,6 +83,11 @@ type SendCriteria struct {
// Only peers that joined the channel would receive this message
}

// String returns a string representation of this SendCriteria
func (sc SendCriteria) String() string {
return fmt.Sprintf("channel: %s, tout: %v, minAck: %d, maxPeers: %d", sc.Channel, sc.Timeout, sc.MinAck, sc.MaxPeers)
}

// Config is the configuration of the gossip component
type Config struct {
BindPort int // Port we bind to, used only for tests
Expand Down
19 changes: 10 additions & 9 deletions gossip/privdata/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -271,26 +272,26 @@ type collectionAccessPolicy struct {
n uint64
}

func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minInternalPeers")
}

func (cap *collectionAccessPolicy) RequiredExternalPeerCount() int {
return viper.GetInt("peer.gossip.pvtData.minExternalPeers")
}

func (cap *collectionAccessPolicy) GetAccessFilter() privdata.Filter {
return func(sd common.SignedData) bool {
that, _ := asn1.Marshal(sd)
this, _ := asn1.Marshal(cap.cs.expectedSignedData)
if hex.EncodeToString(that) != hex.EncodeToString(this) {
panic("Self signed data passed isn't equal to expected")
panic(fmt.Errorf("self signed data passed isn't equal to expected:%v, %v", sd, cap.cs.expectedSignedData))
}
_, exists := cap.cs.policies[*cap]
return exists || cap.cs.acceptsAll
}
}

func (cap *collectionAccessPolicy) RequiredExternalPeerCount() int {
return 0
}

func (cap *collectionAccessPolicy) RequiredInternalPeerCount() int {
return 0
}

func TestPvtDataCollections_FailOnEmptyPayload(t *testing.T) {
collection := &util.PvtDataCollections{
&ledger.TxPvtData{
Expand Down
118 changes: 74 additions & 44 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ type PvtDataDistributor interface {

// distributorImpl the implementation of the private data distributor interface
type distributorImpl struct {
chainID string
minAck int
maxPeers int
chainID string
gossipAdapter
}

Expand All @@ -55,8 +53,6 @@ func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
return &distributorImpl{
chainID: chainID,
gossipAdapter: gossip,
minAck: viper.GetInt("peer.gossip.pvtData.minAck"),
maxPeers: viper.GetInt("peer.gossip.pvtData.maxPeers"),
}
}

Expand Down Expand Up @@ -98,54 +94,64 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.
return nil, errors.Errorf("No collection access policy filter computed for %v", cc)
}

routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature) bool {
return colFilter(common.SignedData{
Data: signature.Message,
Signature: signature.Signature,
Identity: []byte(signature.PeerIdentity),
})
})

pvtDataMsg, err := d.createPrivateDataMessage(txID, namespace, collection.CollectionName, collection.Rwset)
if err != nil {
logger.Error("Failed to retrieve peer routing filter for", cc, " due to", err)
return nil, err
return nil, errors.WithStack(err)
}

msg := &proto.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
Tag: proto.GossipMessage_CHAN_ONLY,
Content: &proto.GossipMessage_PrivateData{
PrivateData: &proto.PrivateDataMessage{
Payload: &proto.PrivatePayload{
Namespace: namespace,
CollectionName: collectionName,
TxId: txID,
PrivateRwset: collection.Rwset,
},
},
},
}

pvtDataMsg, err := msg.NoopSign()
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
disseminationPlan = append(disseminationPlan, dPlan...)
}
}
return disseminationPlan, nil
}

sc := gossip2.SendCriteria{
Timeout: time.Second,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: d.maxPeers,
MinAck: d.minAck,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *proto.SignedGossipMessage) ([]*dissemination, error) {
var disseminationPlan []*dissemination
for _, expectedToBeInOurOrg := range []bool{true, false} {
var expectedToBeInOurOrg bool = expectedToBeInOurOrg
minAck := colAP.RequiredExternalPeerCount()
if expectedToBeInOurOrg {
minAck = colAP.RequiredInternalPeerCount()
}

routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature, isOurOrg bool) bool {
if isOurOrg != expectedToBeInOurOrg {
return false
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
return colFilter(common.SignedData{
Data: signature.Message,
Signature: signature.Signature,
Identity: []byte(signature.PeerIdentity),
})
})

if err != nil {
logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
return nil, err
}

maxPeers := viper.GetInt("peer.gossip.pvtData.maxExternalPeers")
if expectedToBeInOurOrg {
maxPeers = viper.GetInt("peer.gossip.pvtData.maxInternalPeers")
}

sc := gossip2.SendCriteria{
Timeout: time.Second,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: maxPeers,
MinAck: minAck,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
}
return disseminationPlan, nil
}
Expand All @@ -172,3 +178,27 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error
}
return nil
}

func (d *distributorImpl) createPrivateDataMessage(txID, namespace, collectionName string, rwset []byte) (*proto.SignedGossipMessage, error) {
msg := &proto.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
Tag: proto.GossipMessage_CHAN_ONLY,
Content: &proto.GossipMessage_PrivateData{
PrivateData: &proto.PrivateDataMessage{
Payload: &proto.PrivatePayload{
Namespace: namespace,
CollectionName: collectionName,
TxId: txID,
PrivateRwset: rwset,
},
},
},
}

pvtDataMsg, err := msg.NoopSign()
if err != nil {
return nil, err
}
return pvtDataMsg, nil
}
Loading

0 comments on commit 29caeb5

Please sign in to comment.