Skip to content

Commit a21d89d

Browse files
committed
[FAB-6379] Prioritize pulling from endorsers
This change set makes the pull mechanism of private data to try select endorsers of a certain private RWSet over others. Change-Id: If5f2e0a0031a208fee06c749216f708b81342847 Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent ff714cd commit a21d89d

File tree

3 files changed

+98
-20
lines changed

3 files changed

+98
-20
lines changed

gossip/privdata/pull.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (p *puller) waitForMembership() []discovery.NetworkMember {
202202

203203
func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
204204
// computeFilters returns a map from a digest to a routing filter
205-
dig2Filter, err := p.computeFilters(dig2src.keys())
205+
dig2Filter, err := p.computeFilters(dig2src)
206206
if err != nil {
207207
return nil, errors.WithStack(err)
208208
}
@@ -216,6 +216,7 @@ func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
216216
logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")
217217
return nil, errors.New("Empty membership")
218218
}
219+
members = randomizeMemberList(members)
219220
var res []*proto.PvtDataElement
220221
// Distribute requests to peers, and obtain subscriptions for all their messages
221222
// matchDigestToPeer returns a map from a peer to the digests which we would ask it for
@@ -308,10 +309,14 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
308309
}
309310
res := make(map[remotePeer][]proto.PvtDataDigest)
310311
// Create a mapping between peer and digests to ask for
311-
members = randomizeMemberList(members)
312-
for dig, filt := range dig2Filter {
313-
// Find the first peer that matches the filter
314-
selectedPeer := filter.First(members, filt)
312+
for dig, collectionFilter := range dig2Filter {
313+
// Find a peer that is an endorser
314+
selectedPeer := filter.First(members, collectionFilter.endorser)
315+
if selectedPeer == nil {
316+
logger.Debug("No endorser found for", dig)
317+
// Find some peer that is in the collection
318+
selectedPeer = filter.First(members, collectionFilter.anyPeer)
319+
}
315320
if selectedPeer == nil {
316321
logger.Debug("No peer matches txID", dig.TxId, "collection", dig.Collection)
317322
continue
@@ -332,12 +337,18 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
332337
return res, noneSelectedPeers
333338
}
334339

335-
type digestToFilterMapping map[proto.PvtDataDigest]filter.RoutingFilter
340+
type collectionRoutingFilter struct {
341+
anyPeer filter.RoutingFilter
342+
endorser filter.RoutingFilter
343+
}
344+
345+
type digestToFilterMapping map[proto.PvtDataDigest]collectionRoutingFilter
336346

337347
func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
338348
var filters []filter.RoutingFilter
339349
for _, f := range dig2f {
340-
filters = append(filters, f)
350+
filters = append(filters, f.endorser)
351+
filters = append(filters, f.anyPeer)
341352
}
342353
return filters
343354
}
@@ -355,9 +366,9 @@ func (dig2Filter digestToFilterMapping) String() string {
355366
return buffer.String()
356367
}
357368

358-
func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterMapping, error) {
359-
filters := make(map[proto.PvtDataDigest]filter.RoutingFilter)
360-
for _, digest := range digests {
369+
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
370+
filters := make(map[proto.PvtDataDigest]collectionRoutingFilter)
371+
for digest, sources := range dig2src {
361372
collection := p.cs.RetrieveCollectionAccessPolicy(fcommon.CollectionCriteria{
362373
Channel: p.channel,
363374
TxId: digest.TxId,
@@ -371,17 +382,35 @@ func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterM
371382
if f == nil {
372383
return nil, errors.Errorf("Failed obtaining collection filter for channel %s, txID %s, collection %s", p.channel, digest.TxId, digest.Collection)
373384
}
374-
rf, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
385+
anyPeerInCollection, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
375386
return f(fcommon.SignedData{
376387
Signature: peerSignature.Signature,
377388
Identity: peerSignature.PeerIdentity,
378389
Data: peerSignature.Message,
379390
})
380391
})
392+
393+
if err != nil {
394+
return nil, errors.WithStack(err)
395+
}
396+
sources := sources
397+
endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature, _ bool) bool {
398+
for _, endorsement := range sources {
399+
if bytes.Equal(endorsement.Endorser, []byte(peerSignature.PeerIdentity)) {
400+
return true
401+
}
402+
}
403+
return false
404+
})
405+
381406
if err != nil {
382407
return nil, errors.WithStack(err)
383408
}
384-
filters[*digest] = rf
409+
410+
filters[*digest] = collectionRoutingFilter{
411+
anyPeer: anyPeerInCollection,
412+
endorser: endorserPeer,
413+
}
385414
}
386415
return filters, nil
387416
}

gossip/privdata/pull_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,3 +435,57 @@ func TestPullerRetries(t *testing.T) {
435435
assert.NoError(t, err)
436436
assert.Equal(t, transientStore, fetched)
437437
}
438+
439+
func TestPullerPreferEndorsers(t *testing.T) {
440+
t.Parallel()
441+
// Scenario: p1 pulls from p2, p3, p4, p5
442+
// and the only endorser for col1 is p3, so it should be selected
443+
// at the top priority for col1.
444+
// for col2, only p2 should have the data, but its not an endorser of the data.
445+
gn := &gossipNetwork{}
446+
447+
policyStore := newCollectionStore().withPolicy("col1").thatMapsTo("p1", "p2", "p3", "p4", "p5").withPolicy("col2").thatMapsTo("p1", "p2")
448+
p1 := gn.newPuller("p1", policyStore, "p2", "p3", "p4", "p5")
449+
450+
p3TransientStore := newPRWSet()
451+
p2TransientStore := newPRWSet()
452+
453+
p2 := gn.newPuller("p2", policyStore)
454+
p3 := gn.newPuller("p3", policyStore)
455+
gn.newPuller("p4", policyStore)
456+
gn.newPuller("p5", policyStore)
457+
458+
dig1 := &proto.PvtDataDigest{
459+
TxId: "txID1",
460+
Collection: "col1",
461+
Namespace: "ns1",
462+
}
463+
464+
dig2 := &proto.PvtDataDigest{
465+
TxId: "txID1",
466+
Collection: "col2",
467+
Namespace: "ns1",
468+
}
469+
470+
// We only define an action for dig2 on p2, and the test would fail with panic if any other peer is asked for
471+
// a private RWSet on dig2
472+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p2TransientStore)
473+
474+
// We only define an action for dig1 on p3, and the test would fail with panic if any other peer is asked for
475+
// a private RWSet on dig1
476+
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p3TransientStore)
477+
478+
dasf := &digestsAndSourceFactory{}
479+
d2s := dasf.mapDigest(dig1).toSources("p3").mapDigest(dig2).toSources().create()
480+
fetchedMessages, err := p1.fetch(d2s)
481+
assert.NoError(t, err)
482+
rws1 := util.PrivateRWSet(fetchedMessages[0].Payload[0])
483+
rws2 := util.PrivateRWSet(fetchedMessages[0].Payload[1])
484+
rws3 := util.PrivateRWSet(fetchedMessages[1].Payload[0])
485+
rws4 := util.PrivateRWSet(fetchedMessages[1].Payload[1])
486+
fetched := []util.PrivateRWSet{rws1, rws2, rws3, rws4}
487+
assert.Contains(t, fetched, p3TransientStore[0])
488+
assert.Contains(t, fetched, p3TransientStore[1])
489+
assert.Contains(t, fetched, p2TransientStore[0])
490+
assert.Contains(t, fetched, p2TransientStore[1])
491+
}

gossip/privdata/util.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,14 @@ func (f *digestsAndSourceFactory) mapDigest(dig *gossip2.PvtDataDigest) *digests
250250
return f
251251
}
252252

253-
func (f *digestsAndSourceFactory) toSources(orgs ...string) *digestsAndSourceFactory {
253+
func (f *digestsAndSourceFactory) toSources(peers ...string) *digestsAndSourceFactory {
254254
if f.d2s == nil {
255255
f.d2s = make(dig2sources)
256256
}
257257
var endorsements []*peer.Endorsement
258-
for i, org := range orgs {
259-
sId := &msp.SerializedIdentity{
260-
Mspid: org,
261-
IdBytes: []byte(fmt.Sprintf("p%d.%s", i, org)),
262-
}
263-
b, _ := proto.Marshal(sId)
258+
for _, p := range peers {
264259
endorsements = append(endorsements, &peer.Endorsement{
265-
Endorser: b,
260+
Endorser: []byte(p),
266261
})
267262
}
268263
f.d2s[f.lastDig] = endorsements

0 commit comments

Comments
 (0)