Skip to content

Commit b1c90bb

Browse files
committed
[FAB-5752] Gossip identity expiration III
This commit: 1) Makes gossip's identity package use the new method Expiration() that was added to the MessageCryptoService in the previous commit. Now, when an identity is added to the identity mapper - a timer is set and the identity is automatically purged after the timer expires. The timer is set to the time left until the identity is expired. 2) Adds some tests to the identity package, and makes its code-coverage reach 100%. 3) Moves the logic that periodically purges un-used identities from the gossip package to the identity package, to improve code cohesion. Change-Id: I684197ff0f20031e80e9e9341042a5593eaca07a Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent e16c485 commit b1c90bb

File tree

6 files changed

+257
-91
lines changed

6 files changed

+257
-91
lines changed

gossip/gossip/certstore.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,11 @@ func (cs *certStore) createIdentityMessage() (*proto.SignedGossipMessage, error)
133133
return sMsg, errors.WithStack(err)
134134
}
135135

136-
func (cs *certStore) listRevokedPeers(isSuspected api.PeerSuspector) []common.PKIidType {
137-
revokedPeers := cs.idMapper.ListInvalidIdentities(isSuspected)
138-
for _, pkiID := range revokedPeers {
139-
cs.pull.Remove(string(pkiID))
140-
}
141-
return revokedPeers
136+
func (cs *certStore) suspectPeers(isSuspected api.PeerSuspector) {
137+
cs.idMapper.SuspectPeers(isSuspected)
142138
}
143139

144140
func (cs *certStore) stop() {
145141
cs.pull.Stop()
142+
cs.idMapper.Stop()
146143
}

gossip/gossip/certstore_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func TestCertStoreBadSignature(t *testing.T) {
8989
}
9090
pm, cs, _ := createObjects(badSignature, nil)
9191
defer pm.Stop()
92+
defer cs.stop()
9293
testCertificateUpdate(t, false, cs)
9394
}
9495

@@ -99,6 +100,7 @@ func TestCertStoreMismatchedIdentity(t *testing.T) {
99100

100101
pm, cs, _ := createObjects(mismatchedIdentity, nil)
101102
defer pm.Stop()
103+
defer cs.stop()
102104
testCertificateUpdate(t, false, cs)
103105
}
104106

@@ -109,18 +111,15 @@ func TestCertStoreShouldSucceed(t *testing.T) {
109111

110112
pm, cs, _ := createObjects(totallyFineIdentity, nil)
111113
defer pm.Stop()
114+
defer cs.stop()
112115
testCertificateUpdate(t, true, cs)
113116
}
114117

115118
func TestCertRevocation(t *testing.T) {
116-
identityExpCheckInterval := identityExpirationCheckInterval
117119
defer func() {
118-
identityExpirationCheckInterval = identityExpCheckInterval
119120
cs.revokedPkiIDS = map[string]struct{}{}
120121
}()
121122

122-
identityExpirationCheckInterval = time.Second
123-
124123
totallyFineIdentity := func(nonce uint64) proto.ReceivedMessage {
125124
return createUpdateMessage(nonce, createValidUpdateMessage())
126125
}
@@ -130,6 +129,7 @@ func TestCertRevocation(t *testing.T) {
130129
pm, cStore, sender := createObjects(totallyFineIdentity, func(message *proto.SignedGossipMessage) {
131130
askedForIdentity <- struct{}{}
132131
})
132+
defer cStore.stop()
133133
defer pm.Stop()
134134
testCertificateUpdate(t, true, cStore)
135135
// Should have asked for an identity for the first time
@@ -172,12 +172,12 @@ func TestCertRevocation(t *testing.T) {
172172
select {
173173
case <-time.After(time.Second * 5):
174174
case <-askedForIdentity:
175-
assert.Fail(t, "Shouldn't have asked for an identity, becase we already have it")
175+
assert.Fail(t, "Shouldn't have asked for an identity, because we already have it")
176176
}
177177
assert.Len(t, askedForIdentity, 0)
178178
// Revoke the identity
179179
cs.revoke(common.PKIidType("B"))
180-
cStore.listRevokedPeers(func(id api.PeerIdentityType) bool {
180+
cStore.suspectPeers(func(id api.PeerIdentityType) bool {
181181
return string(id) == "B"
182182
})
183183

@@ -215,11 +215,11 @@ func TestCertExpiration(t *testing.T) {
215215
defer identity.SetIdentityUsageThreshold(idUsageThreshold)
216216

217217
// Backup original identityInactivityCheckInterval value
218-
inactivityCheckInterval := identityInactivityCheckInterval
219-
identityInactivityCheckInterval = time.Second * 1
218+
usageThreshold := identity.GetIdentityUsageThreshold()
219+
identity.SetIdentityUsageThreshold(time.Second)
220220
// Restore original identityInactivityCheckInterval value
221221
defer func() {
222-
identityInactivityCheckInterval = inactivityCheckInterval
222+
identity.SetIdentityUsageThreshold(usageThreshold)
223223
}()
224224

225225
g1 := newGossipInstance(4321, 0, 0, 1)
@@ -430,7 +430,9 @@ func createObjects(updateFactory func(uint64) proto.ReceivedMessage, msgCons pro
430430
selfIdentity := api.PeerIdentityType("SELF")
431431
certStore = newCertStore(&pullerMock{
432432
Mediator: pullMediator,
433-
}, identity.NewIdentityMapper(cs, selfIdentity, func(_ common.PKIidType, _ api.PeerIdentityType) {}), selfIdentity, cs)
433+
}, identity.NewIdentityMapper(cs, selfIdentity, func(pkiID common.PKIidType, _ api.PeerIdentityType) {
434+
pullMediator.Remove(string(pkiID))
435+
}), selfIdentity, cs)
434436

435437
wg := sync.WaitGroup{}
436438
wg.Add(1)

gossip/gossip/gossip_impl.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,6 @@ const (
3636
acceptChanSize = 100
3737
)
3838

39-
var (
40-
identityExpirationCheckInterval = time.Hour * 24
41-
identityInactivityCheckInterval = time.Minute * 10
42-
)
43-
4439
type channelRoutingFilterFactory func(channel.GossipChannel) filter.RoutingFilter
4540

4641
type gossipServiceImpl struct {
@@ -95,6 +90,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
9590
g.stateInfoMsgStore = g.newStateInfoMsgStore()
9691

9792
g.idMapper = identity.NewIdentityMapper(mcs, selfIdentity, func(pkiID common.PKIidType, identity api.PeerIdentityType) {
93+
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
9894
g.certPuller.Remove(string(pkiID))
9995
})
10096

@@ -127,7 +123,6 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
127123
}
128124

129125
go g.start()
130-
go g.periodicalIdentityValidationAndExpiration()
131126
go g.connect2BootstrapPeers()
132127

133128
return g
@@ -196,24 +191,7 @@ func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID com
196191
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
197192
// any connections to peers with identities that are found invalid
198193
func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
199-
for _, pkiID := range g.certStore.listRevokedPeers(isSuspected) {
200-
g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
201-
}
202-
}
203-
204-
func (g *gossipServiceImpl) periodicalIdentityValidationAndExpiration() {
205-
// We check once every identityExpirationCheckInterval for identities that have been expired
206-
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
207-
// We need to validate every identity to check if it has been expired
208-
return true
209-
}, identityExpirationCheckInterval)
210-
211-
// We check once every identityInactivityCheckInterval for identities that have not been used for a long time
212-
go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
213-
// We don't validate any identity, because we just want to know whether
214-
// it has not been used for a long time
215-
return false
216-
}, identityInactivityCheckInterval)
194+
g.certStore.suspectPeers(isSuspected)
217195
}
218196

219197
func (g *gossipServiceImpl) periodicalIdentityValidation(suspectFunc api.PeerSuspector, interval time.Duration) {

gossip/gossip/gossip_test.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ func init() {
6363
testWG.Add(1)
6464
}
6565
factory.InitFactories(nil)
66-
identityExpirationCheckInterval = time.Second
6766
}
6867

68+
var expirationTimes map[string]time.Time = map[string]time.Time{}
69+
6970
var orgInChannelA = api.OrgIdentityType("ORG1")
7071

7172
func acceptData(m interface{}) bool {
@@ -120,6 +121,13 @@ type naiveCryptoService struct {
120121
revokedPkiIDS map[string]struct{}
121122
}
122123

124+
func (*naiveCryptoService) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
125+
if exp, exists := expirationTimes[string(peerIdentity)]; exists {
126+
return exp, nil
127+
}
128+
return time.Now().Add(time.Hour), nil
129+
}
130+
123131
type orgCryptoService struct {
124132
}
125133

@@ -147,10 +155,6 @@ func (cs *naiveCryptoService) VerifyByChannel(_ common.ChainID, identity api.Pee
147155
return errors.New("Forbidden")
148156
}
149157

150-
func (*naiveCryptoService) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
151-
return time.Time{}, nil
152-
}
153-
154158
func (cs *naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
155159
cs.RLock()
156160
defer cs.RUnlock()
@@ -1004,18 +1008,30 @@ func TestDisseminateAll2All(t *testing.T) {
10041008
func TestIdentityExpiration(t *testing.T) {
10051009
t.Parallel()
10061010
defer testWG.Done()
1007-
// Scenario: spawn 4 peers and make the MessageCryptoService revoke one of them.
1011+
// Scenario: spawn 5 peers and make the MessageCryptoService revoke one of the first 4.
1012+
// The last peer's certificate expires after a few seconds.
10081013
// Eventually, the rest of the peers should not be able to communicate with
10091014
// the revoked peer at all because its identity would seem to them as expired
10101015

1016+
// Set expiration of the last peer to 5 seconds from now
1017+
expirationTimes["localhost:7004"] = time.Now().Add(time.Second * 5)
1018+
10111019
portPrefix := 7000
10121020
g1 := newGossipInstance(portPrefix, 0, 100)
10131021
g2 := newGossipInstance(portPrefix, 1, 100, 0)
10141022
g3 := newGossipInstance(portPrefix, 2, 100, 0)
10151023
g4 := newGossipInstance(portPrefix, 3, 100, 0)
1024+
g5 := newGossipInstance(portPrefix, 4, 100, 0)
10161025

10171026
peers := []Gossip{g1, g2, g3, g4}
10181027

1028+
// Make the last peer be revoked in 5 seconds from now
1029+
time.AfterFunc(time.Second*5, func() {
1030+
for _, p := range peers {
1031+
p.(*gossipServiceImpl).mcs.(*naiveCryptoService).revoke(common.PKIidType("localhost:7004"))
1032+
}
1033+
})
1034+
10191035
seeAllNeighbors := func() bool {
10201036
for i := 0; i < 4; i++ {
10211037
neighborCount := len(peers[i].Peers())
@@ -1035,22 +1051,35 @@ func TestIdentityExpiration(t *testing.T) {
10351051
}
10361052
p.(*gossipServiceImpl).mcs.(*naiveCryptoService).revoke(revokedPkiID)
10371053
}
1054+
// Trigger a config update to the rest of the peers
1055+
for i := 0; i < 4; i++ {
1056+
if i == revokedPeerIndex {
1057+
continue
1058+
}
1059+
peers[i].SuspectPeers(func(_ api.PeerIdentityType) bool {
1060+
return true
1061+
})
1062+
}
10381063
// Ensure that no one talks to the peer that is revoked
10391064
ensureRevokedPeerIsIgnored := func() bool {
10401065
for i := 0; i < 4; i++ {
10411066
neighborCount := len(peers[i].Peers())
10421067
expectedNeighborCount := 2
1043-
if i == revokedPeerIndex {
1068+
// If it's the revoked peer, or the last peer who's certificate
1069+
// has expired
1070+
if i == revokedPeerIndex || i == 4 {
10441071
expectedNeighborCount = 0
10451072
}
10461073
if neighborCount != expectedNeighborCount {
1074+
fmt.Println("neighbor count of", i, "is", neighborCount)
10471075
return false
10481076
}
10491077
}
10501078
return true
10511079
}
10521080
waitUntilOrFail(t, ensureRevokedPeerIsIgnored)
10531081
stopPeers(peers)
1082+
g5.Stop()
10541083
}
10551084

10561085
func TestEndedGoroutines(t *testing.T) {

0 commit comments

Comments
 (0)