Skip to content

Commit 34eb8fe

Browse files
committed
[FAB-6012] Custom channel membership filtering
This commit adds an ability to query gossip for channel members and pass a custom filter that filters out peers according to their identities and signatures. Change-Id: Ibad6b2ef257864b2d980cc1338f4a41be4b33eb9 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 93f3c9b commit 34eb8fe

File tree

10 files changed

+121
-6
lines changed

10 files changed

+121
-6
lines changed

gossip/api/crypto.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,11 @@ type PeerSuspector func(identity PeerIdentityType) bool
5959
// PeerSecureDialOpts returns the gRPC DialOptions to use for connection level
6060
// security when communicating with remote peer endpoints
6161
type PeerSecureDialOpts func() []grpc.DialOption
62+
63+
// PeerSignature defines a signature of a peer
64+
// on a given message
65+
type PeerSignature struct {
66+
Signature []byte
67+
Message []byte
68+
PeerIdentity PeerIdentityType
69+
}

gossip/api/subchannel.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ import "github.com/hyperledger/fabric/gossip/common"
1212
// or which peers are eligible of receiving a certain message
1313
type RoutingFilter func(peerIdentity PeerIdentityType) bool
1414

15-
// CollectionCriteria describes a certain sub-channel, or a part of it
16-
type CollectionCriteria []byte
15+
// CollectionCriteria describes a way of selecting peers from a sub-channel
16+
// given their signatures
17+
type SubChannelSelectionCriteria func(signature PeerSignature) bool
1718

1819
// RoutingFilterFactory defines an object that given a CollectionCriteria and a channel,
1920
// it can ascertain which peers should be aware of the data related to the
2021
// CollectionCriteria.
2122
type RoutingFilterFactory interface {
2223
// Peers returns a RoutingFilter for given chainID and CollectionCriteria
23-
Peers(common.ChainID, CollectionCriteria) RoutingFilter
24+
Peers(common.ChainID, SubChannelSelectionCriteria) RoutingFilter
2425
}

gossip/filter/filter.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,13 @@ func SelectPeers(k int, peerPool []discovery.NetworkMember, filter RoutingFilter
6565

6666
return remotePeers
6767
}
68+
69+
// First returns the first peer that matches the given filter
70+
func First(peerPool []discovery.NetworkMember, filter RoutingFilter) *comm.RemotePeer {
71+
for _, p := range peerPool {
72+
if filter(p) {
73+
return &comm.RemotePeer{PKIID: p.PKIid, Endpoint: p.PreferredEndpoint()}
74+
}
75+
}
76+
return nil
77+
}

gossip/filter/filter_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package filter
99
import (
1010
"testing"
1111

12+
"github.com/hyperledger/fabric/gossip/comm"
1213
"github.com/hyperledger/fabric/gossip/common"
1314
"github.com/hyperledger/fabric/gossip/discovery"
1415
"github.com/stretchr/testify/assert"
@@ -36,6 +37,27 @@ func TestCombineRoutingFilters(t *testing.T) {
3637
assert.False(t, CombineRoutingFilters(a, b)(discovery.NetworkMember{InternalEndpoint: "b"}))
3738
}
3839

40+
func TestFirst(t *testing.T) {
41+
peerA := discovery.NetworkMember{Endpoint: "a"}
42+
peerB := discovery.NetworkMember{Endpoint: "b"}
43+
peers := []discovery.NetworkMember{peerA, peerB}
44+
assert.Equal(t, &comm.RemotePeer{Endpoint: "a"}, First(peers, func(discovery.NetworkMember) bool {
45+
return true
46+
}))
47+
48+
assert.Equal(t, &comm.RemotePeer{Endpoint: "b"}, First(peers, func(nm discovery.NetworkMember) bool {
49+
return nm.PreferredEndpoint() == "b"
50+
}))
51+
52+
peerAA := discovery.NetworkMember{Endpoint: "aa"}
53+
peerAB := discovery.NetworkMember{Endpoint: "ab"}
54+
peers = append(peers, peerAA)
55+
peers = append(peers, peerAB)
56+
assert.Equal(t, &comm.RemotePeer{Endpoint: "aa"}, First(peers, func(nm discovery.NetworkMember) bool {
57+
return len(nm.PreferredEndpoint()) > 1
58+
}))
59+
}
60+
3961
func TestSelectPeers(t *testing.T) {
4062
a := func(nm discovery.NetworkMember) bool {
4163
return nm.Endpoint == "a"

gossip/gossip/channel/channel.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ type GossipChannel interface {
4848
// GetPeers returns a list of peers with metadata as published by them
4949
GetPeers() []discovery.NetworkMember
5050

51+
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
52+
// only peer identities that match the given criteria
53+
PeerFilter(api.SubChannelSelectionCriteria) filter.RoutingFilter
54+
5155
// IsMemberInChan checks whether the given member is eligible to be in the channel
5256
IsMemberInChan(member discovery.NetworkMember) bool
5357

@@ -372,6 +376,27 @@ func (gc *gossipChannel) IsMemberInChan(member discovery.NetworkMember) bool {
372376
return gc.IsOrgInChannel(org)
373377
}
374378

379+
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
380+
// only peer identities that match the given criteria
381+
func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCriteria) filter.RoutingFilter {
382+
return func(member discovery.NetworkMember) bool {
383+
peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
384+
if len(peerIdentity) == 0 {
385+
return false
386+
}
387+
msg := gc.stateInfoMsgStore.MembershipStore.MsgByID(member.PKIid)
388+
if msg == nil {
389+
return false
390+
}
391+
392+
return messagePredicate(api.PeerSignature{
393+
Message: msg.Payload,
394+
Signature: msg.Signature,
395+
PeerIdentity: peerIdentity,
396+
})
397+
}
398+
}
399+
375400
// IsOrgInChannel returns whether the given organization is in the channel
376401
func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
377402
gc.RLock()
@@ -722,6 +747,7 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
722747
if !msg.IsStateInfoMsg() {
723748
return
724749
}
750+
725751
gc.Lock()
726752
defer gc.Unlock()
727753

gossip/gossip/channel/channel_test.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package channel
88

99
import (
10+
"bytes"
1011
"errors"
1112
"fmt"
1213
"sync"
@@ -1427,6 +1428,7 @@ func TestGossipChannelEligibility(t *testing.T) {
14271428
{PKIid: pkiIDInOrg1},
14281429
{PKIid: pkiIDInOrg1ButNotEligible},
14291430
{PKIid: pkiIDinOrg2},
1431+
{PKIid: pkiIDinOrg3},
14301432
}
14311433
adapter.On("GetMembership").Return(members)
14321434
adapter.On("Gossip", mock.Anything)
@@ -1453,15 +1455,36 @@ func TestGossipChannelEligibility(t *testing.T) {
14531455
})
14541456
// Every peer sends a StateInfo message
14551457
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)})
1456-
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
1457-
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1ButNotEligible, channelA)})
1458-
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDinOrg3, channelA)})
1458+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg2, msg: createStateInfoMsg(1, pkiIDinOrg2, channelA)})
1459+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1ButNotEligible, msg: createStateInfoMsg(1, pkiIDInOrg1ButNotEligible, channelA)})
1460+
gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg3, msg: createStateInfoMsg(1, pkiIDinOrg3, channelA)})
14591461

14601462
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
14611463
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
14621464
assert.True(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDInOrg1ButNotEligible}))
14631465
assert.False(t, gc.EligibleForChannel(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
14641466

1467+
// Ensure peers from the channel are returned
1468+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1469+
return true
1470+
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1471+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1472+
return true
1473+
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1474+
// But not peers which aren't in the channel
1475+
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1476+
return true
1477+
})(discovery.NetworkMember{PKIid: pkiIDinOrg3}))
1478+
1479+
// Ensure the given predicate is considered
1480+
assert.True(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1481+
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
1482+
})(discovery.NetworkMember{PKIid: pkiIDinOrg2}))
1483+
1484+
assert.False(t, gc.PeerFilter(func(signature api.PeerSignature) bool {
1485+
return bytes.Equal(signature.PeerIdentity, []byte("pkiIDinOrg2"))
1486+
})(discovery.NetworkMember{PKIid: pkiIDInOrg1}))
1487+
14651488
// Remove org2 from the channel
14661489
gc.ConfigureChannel(&joinChanMsg{
14671490
members2AnchorPeers: map[string][]api.AnchorPeer{

gossip/gossip/gossip.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hyperledger/fabric/gossip/comm"
1616
"github.com/hyperledger/fabric/gossip/common"
1717
"github.com/hyperledger/fabric/gossip/discovery"
18+
"github.com/hyperledger/fabric/gossip/filter"
1819
proto "github.com/hyperledger/fabric/protos/gossip"
1920
)
2021

@@ -42,6 +43,10 @@ type Gossip interface {
4243
// Gossip sends a message to other peers to the network
4344
Gossip(msg *proto.GossipMessage)
4445

46+
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
47+
// only peer identities that match the given criteria, and that they published their channel participation
48+
PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error)
49+
4550
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
4651
// If passThrough is false, the messages are processed by the gossip layer beforehand.
4752
// If passThrough is true, the gossip layer doesn't intervene and the messages

gossip/gossip/gossip_impl.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,16 @@ func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.N
662662
return gc.GetPeers()
663663
}
664664

665+
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
666+
// only peer identities that match the given criteria, and that they published their channel participation
667+
func (g *gossipServiceImpl) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
668+
gc := g.chanState.getGossipChannelByChainID(channel)
669+
if gc == nil {
670+
return nil, errors.Errorf("Channel %s doesn't exist", string(channel))
671+
}
672+
return gc.PeerFilter(messagePredicate), nil
673+
}
674+
665675
// Stop stops the gossip component
666676
func (g *gossipServiceImpl) Stop() {
667677
if g.toDie() {

gossip/service/join_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/hyperledger/fabric/gossip/comm"
1717
"github.com/hyperledger/fabric/gossip/common"
1818
"github.com/hyperledger/fabric/gossip/discovery"
19+
"github.com/hyperledger/fabric/gossip/filter"
1920
"github.com/hyperledger/fabric/gossip/util"
2021
proto "github.com/hyperledger/fabric/protos/gossip"
2122
"github.com/hyperledger/fabric/protos/peer"
@@ -38,6 +39,10 @@ type gossipMock struct {
3839
mock.Mock
3940
}
4041

42+
func (*gossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
43+
panic("implement me")
44+
}
45+
4146
func (*gossipMock) SuspectPeers(s api.PeerSuspector) {
4247
panic("implement me")
4348
}

gossip/state/mocks/gossip.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/hyperledger/fabric/gossip/comm"
1212
"github.com/hyperledger/fabric/gossip/common"
1313
"github.com/hyperledger/fabric/gossip/discovery"
14+
"github.com/hyperledger/fabric/gossip/filter"
1415
proto "github.com/hyperledger/fabric/protos/gossip"
1516
"github.com/stretchr/testify/mock"
1617
)
@@ -19,6 +20,10 @@ type GossipMock struct {
1920
mock.Mock
2021
}
2122

23+
func (*GossipMock) PeerFilter(channel common.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error) {
24+
panic("implement me")
25+
}
26+
2227
func (g *GossipMock) SuspectPeers(s api.PeerSuspector) {
2328
g.Called(s)
2429

0 commit comments

Comments
 (0)