@@ -202,7 +202,7 @@ func (p *puller) waitForMembership() []discovery.NetworkMember {
202
202
203
203
func (p * puller ) fetch (dig2src dig2sources ) ([]* proto.PvtDataElement , error ) {
204
204
// computeFilters returns a map from a digest to a routing filter
205
- dig2Filter , err := p .computeFilters (dig2src . keys () )
205
+ dig2Filter , err := p .computeFilters (dig2src )
206
206
if err != nil {
207
207
return nil , errors .WithStack (err )
208
208
}
@@ -216,6 +216,7 @@ func (p *puller) fetch(dig2src dig2sources) ([]*proto.PvtDataElement, error) {
216
216
logger .Warning ("Do not know any peer in the channel(" , p .channel , ") that matches the policies , aborting" )
217
217
return nil , errors .New ("Empty membership" )
218
218
}
219
+ members = randomizeMemberList (members )
219
220
var res []* proto.PvtDataElement
220
221
// Distribute requests to peers, and obtain subscriptions for all their messages
221
222
// matchDigestToPeer returns a map from a peer to the digests which we would ask it for
@@ -308,10 +309,14 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
308
309
}
309
310
res := make (map [remotePeer ][]proto.PvtDataDigest )
310
311
// Create a mapping between peer and digests to ask for
311
- members = randomizeMemberList (members )
312
- for dig , filt := range dig2Filter {
313
- // Find the first peer that matches the filter
314
- selectedPeer := filter .First (members , filt )
312
+ for dig , collectionFilter := range dig2Filter {
313
+ // Find a peer that is an endorser
314
+ selectedPeer := filter .First (members , collectionFilter .endorser )
315
+ if selectedPeer == nil {
316
+ logger .Debug ("No endorser found for" , dig )
317
+ // Find some peer that is in the collection
318
+ selectedPeer = filter .First (members , collectionFilter .anyPeer )
319
+ }
315
320
if selectedPeer == nil {
316
321
logger .Debug ("No peer matches txID" , dig .TxId , "collection" , dig .Collection )
317
322
continue
@@ -332,12 +337,18 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
332
337
return res , noneSelectedPeers
333
338
}
334
339
335
- type digestToFilterMapping map [proto.PvtDataDigest ]filter.RoutingFilter
340
+ type collectionRoutingFilter struct {
341
+ anyPeer filter.RoutingFilter
342
+ endorser filter.RoutingFilter
343
+ }
344
+
345
+ type digestToFilterMapping map [proto.PvtDataDigest ]collectionRoutingFilter
336
346
337
347
func (dig2f digestToFilterMapping ) flattenFilterValues () []filter.RoutingFilter {
338
348
var filters []filter.RoutingFilter
339
349
for _ , f := range dig2f {
340
- filters = append (filters , f )
350
+ filters = append (filters , f .endorser )
351
+ filters = append (filters , f .anyPeer )
341
352
}
342
353
return filters
343
354
}
@@ -355,9 +366,9 @@ func (dig2Filter digestToFilterMapping) String() string {
355
366
return buffer .String ()
356
367
}
357
368
358
- func (p * puller ) computeFilters (digests [] * proto. PvtDataDigest ) (digestToFilterMapping , error ) {
359
- filters := make (map [proto.PvtDataDigest ]filter. RoutingFilter )
360
- for _ , digest := range digests {
369
+ func (p * puller ) computeFilters (dig2src dig2sources ) (digestToFilterMapping , error ) {
370
+ filters := make (map [proto.PvtDataDigest ]collectionRoutingFilter )
371
+ for digest , sources := range dig2src {
361
372
collection := p .cs .RetrieveCollectionAccessPolicy (fcommon.CollectionCriteria {
362
373
Channel : p .channel ,
363
374
TxId : digest .TxId ,
@@ -371,17 +382,35 @@ func (p *puller) computeFilters(digests []*proto.PvtDataDigest) (digestToFilterM
371
382
if f == nil {
372
383
return nil , errors .Errorf ("Failed obtaining collection filter for channel %s, txID %s, collection %s" , p .channel , digest .TxId , digest .Collection )
373
384
}
374
- rf , err := p .PeerFilter (common .ChainID (p .channel ), func (peerSignature api.PeerSignature , _ bool ) bool {
385
+ anyPeerInCollection , err := p .PeerFilter (common .ChainID (p .channel ), func (peerSignature api.PeerSignature , _ bool ) bool {
375
386
return f (fcommon.SignedData {
376
387
Signature : peerSignature .Signature ,
377
388
Identity : peerSignature .PeerIdentity ,
378
389
Data : peerSignature .Message ,
379
390
})
380
391
})
392
+
393
+ if err != nil {
394
+ return nil , errors .WithStack (err )
395
+ }
396
+ sources := sources
397
+ endorserPeer , err := p .PeerFilter (common .ChainID (p .channel ), func (peerSignature api.PeerSignature , _ bool ) bool {
398
+ for _ , endorsement := range sources {
399
+ if bytes .Equal (endorsement .Endorser , []byte (peerSignature .PeerIdentity )) {
400
+ return true
401
+ }
402
+ }
403
+ return false
404
+ })
405
+
381
406
if err != nil {
382
407
return nil , errors .WithStack (err )
383
408
}
384
- filters [* digest ] = rf
409
+
410
+ filters [* digest ] = collectionRoutingFilter {
411
+ anyPeer : anyPeerInCollection ,
412
+ endorser : endorserPeer ,
413
+ }
385
414
}
386
415
return filters , nil
387
416
}
0 commit comments