@@ -57,10 +57,7 @@ type Coordinator interface {
57
57
// the order of private data in slice of PvtDataCollections doesn't implies the order of
58
58
// transactions in the block related to these private data, to get the correct placement
59
59
// need to read TxPvtData.SeqInBlock field
60
- GetPvtDataAndBlockByNum (seqNum uint64 ) (* common.Block , util.PvtDataCollections , error )
61
-
62
- // GetBlockByNum returns block and related to the block private data
63
- GetBlockByNum (seqNum uint64 ) (* common.Block , error )
60
+ GetPvtDataAndBlockByNum (seqNum uint64 , peerAuth common.SignedData ) (* common.Block , util.PvtDataCollections , error )
64
61
65
62
// Get recent block sequence number
66
63
LedgerHeight () (uint64 , error )
@@ -404,35 +401,27 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet {
404
401
}
405
402
}
406
403
407
- func (c * coordinator ) listMissingPrivateData (block * common.Block , ownedRWsets map [rwSetKey ][]byte ) (rwSetKeysByTxIDs , error ) {
408
- if block .Metadata == nil || len (block .Metadata .Metadata ) <= int (common .BlockMetadataIndex_TRANSACTIONS_FILTER ) {
409
- return nil , errors .New ("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap" )
410
- }
411
- txsFilter := txValidationFlags (block .Metadata .Metadata [common .BlockMetadataIndex_TRANSACTIONS_FILTER ])
412
- if len (txsFilter ) != len (block .Data .Data ) {
413
- return nil , errors .Errorf ("Block data size(%d) is different from Tx filter size(%d)" , len (block .Data .Data ), len (txsFilter ))
414
- }
404
+ type blockData [][]byte
415
405
416
- privateRWsetsInBlock := make (map [rwSetKey ]struct {})
417
- missing := make (rwSetKeysByTxIDs )
418
- for seqInBlock , envBytes := range block .Data .Data {
406
+ func (data blockData ) forEach (txsFilter txValidationFlags , consumer func (seqInBlock uint64 , chdr * common.ChannelHeader , txRWSet * rwsetutil.TxRwSet )) error {
407
+ for seqInBlock , envBytes := range data {
419
408
if txsFilter [seqInBlock ] != uint8 (peer .TxValidationCode_VALID ) {
420
409
logger .Debug ("Skipping Tx" , seqInBlock , "because it's invalid. Status is" , txsFilter [seqInBlock ])
421
410
continue
422
411
}
423
412
env , err := utils .GetEnvelopeFromBlock (envBytes )
424
413
if err != nil {
425
- return nil , err
414
+ return err
426
415
}
427
416
428
417
payload , err := utils .GetPayload (env )
429
418
if err != nil {
430
- return nil , err
419
+ return err
431
420
}
432
421
433
422
chdr , err := utils .UnmarshalChannelHeader (payload .Header .ChannelHeader )
434
423
if err != nil {
435
- return nil , err
424
+ return err
436
425
}
437
426
438
427
if chdr .Type != int32 (common .HeaderType_ENDORSER_TRANSACTION ) {
@@ -450,15 +439,32 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
450
439
logger .Warning ("Failed obtaining TxRwSet from ChaincodeAction's results" , err )
451
440
continue
452
441
}
442
+ consumer (uint64 (seqInBlock ), chdr , txRWSet )
443
+ }
444
+ return nil
445
+ }
453
446
447
+ func (c * coordinator ) listMissingPrivateData (block * common.Block , ownedRWsets map [rwSetKey ][]byte ) (rwSetKeysByTxIDs , error ) {
448
+ if block .Metadata == nil || len (block .Metadata .Metadata ) <= int (common .BlockMetadataIndex_TRANSACTIONS_FILTER ) {
449
+ return nil , errors .New ("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap" )
450
+ }
451
+ txsFilter := txValidationFlags (block .Metadata .Metadata [common .BlockMetadataIndex_TRANSACTIONS_FILTER ])
452
+ if len (txsFilter ) != len (block .Data .Data ) {
453
+ return nil , errors .Errorf ("Block data size(%d) is different from Tx filter size(%d)" , len (block .Data .Data ), len (txsFilter ))
454
+ }
455
+
456
+ privateRWsetsInBlock := make (map [rwSetKey ]struct {})
457
+ missing := make (rwSetKeysByTxIDs )
458
+ data := blockData (block .Data .Data )
459
+ err := data .forEach (txsFilter , func (seqInBlock uint64 , chdr * common.ChannelHeader , txRWSet * rwsetutil.TxRwSet ) {
454
460
for _ , ns := range txRWSet .NsRwSets {
455
461
for _ , hashed := range ns .CollHashedRwSets {
456
462
if ! c .isEligible (chdr , ns .NameSpace , hashed .CollectionName ) {
457
463
continue
458
464
}
459
465
key := rwSetKey {
460
466
txID : chdr .TxId ,
461
- seqInBlock : uint64 ( seqInBlock ) ,
467
+ seqInBlock : seqInBlock ,
462
468
hash : hex .EncodeToString (hashed .PvtRwSetHash ),
463
469
namespace : ns .NameSpace ,
464
470
collection : hashed .CollectionName ,
@@ -467,14 +473,16 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
467
473
if _ , exists := ownedRWsets [key ]; ! exists {
468
474
txAndSeq := txAndSeqInBlock {
469
475
txID : chdr .TxId ,
470
- seqInBlock : uint64 ( seqInBlock ) ,
476
+ seqInBlock : seqInBlock ,
471
477
}
472
478
missing [txAndSeq ] = append (missing [txAndSeq ], key )
473
479
}
474
480
} // for all hashed RW sets
475
481
} // for all RW sets
476
- } // for all transactions in block
477
-
482
+ })
483
+ if err != nil {
484
+ return nil , errors .WithStack (err )
485
+ }
478
486
// In the end, iterate over the ownedRWsets, and if the key doesn't exist in
479
487
// the privateRWsetsInBlock - delete it from the ownedRWsets
480
488
for k := range ownedRWsets {
@@ -512,30 +520,93 @@ func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, c
512
520
return eligible
513
521
}
514
522
523
+ type seqAndDataModel struct {
524
+ seq uint64
525
+ dataModel rwset.TxReadWriteSet_DataModel
526
+ }
527
+
528
+ // map from seqAndDataModel to:
529
+ // maap from namespace to []*rwset.CollectionPvtReadWriteSet
530
+ type aggregatedCollections map [seqAndDataModel ]map [string ][]* rwset.CollectionPvtReadWriteSet
531
+
532
+ func (ac aggregatedCollections ) addCollection (seqInBlock uint64 , dm rwset.TxReadWriteSet_DataModel , namespace string , col * rwset.CollectionPvtReadWriteSet ) {
533
+ seq := seqAndDataModel {
534
+ dataModel : dm ,
535
+ seq : seqInBlock ,
536
+ }
537
+ if _ , exists := ac [seq ]; ! exists {
538
+ ac [seq ] = make (map [string ][]* rwset.CollectionPvtReadWriteSet )
539
+ }
540
+ ac [seq ][namespace ] = append (ac [seq ][namespace ], col )
541
+ }
542
+
543
+ func (ac aggregatedCollections ) asPrivateData () []* ledger.TxPvtData {
544
+ var data []* ledger.TxPvtData
545
+ for seq , ns := range ac {
546
+ txPrivateData := & ledger.TxPvtData {
547
+ SeqInBlock : seq .seq ,
548
+ WriteSet : & rwset.TxPvtReadWriteSet {
549
+ DataModel : seq .dataModel ,
550
+ },
551
+ }
552
+ for namespaceName , cols := range ns {
553
+ txPrivateData .WriteSet .NsPvtRwset = append (txPrivateData .WriteSet .NsPvtRwset , & rwset.NsPvtReadWriteSet {
554
+ Namespace : namespaceName ,
555
+ CollectionPvtRwset : cols ,
556
+ })
557
+ }
558
+ data = append (data , txPrivateData )
559
+ }
560
+ return data
561
+ }
562
+
515
563
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
516
564
// the order of private data in slice of PvtDataCollections doesn't implies the order of
517
565
// transactions in the block related to these private data, to get the correct placement
518
566
// need to read TxPvtData.SeqInBlock field
519
- func (c * coordinator ) GetPvtDataAndBlockByNum (seqNum uint64 ) (* common.Block , util.PvtDataCollections , error ) {
567
+ func (c * coordinator ) GetPvtDataAndBlockByNum (seqNum uint64 , peerAuthInfo common. SignedData ) (* common.Block , util.PvtDataCollections , error ) {
520
568
blockAndPvtData , err := c .Committer .GetPvtDataAndBlockByNum (seqNum )
521
569
if err != nil {
522
570
return nil , nil , fmt .Errorf ("cannot retrieve block number %d, due to %s" , seqNum , err )
523
571
}
524
572
525
- var blockPvtData util.PvtDataCollections
573
+ seqs2Namespaces := aggregatedCollections (make (map [seqAndDataModel ]map [string ][]* rwset.CollectionPvtReadWriteSet ))
574
+ data := blockData (blockAndPvtData .Block .Data .Data )
575
+ err = data .forEach (make (txValidationFlags , len (data )), func (seqInBlock uint64 , chdr * common.ChannelHeader , txRWSet * rwsetutil.TxRwSet ) {
576
+ item , exists := blockAndPvtData .BlockPvtData [seqInBlock ]
577
+ if ! exists {
578
+ return
579
+ }
526
580
527
- for _ , item := range blockAndPvtData .BlockPvtData {
528
- blockPvtData = append (blockPvtData , item )
581
+ for _ , ns := range item .WriteSet .NsPvtRwset {
582
+ for _ , col := range ns .CollectionPvtRwset {
583
+ cc := rwset.CollectionCriteria {
584
+ Channel : chdr .ChannelId ,
585
+ TxId : chdr .TxId ,
586
+ Namespace : ns .Namespace ,
587
+ Collection : col .CollectionName ,
588
+ }
589
+ sp := c .PolicyStore .CollectionPolicy (cc )
590
+ if sp == nil {
591
+ logger .Warning ("Failed obtaining policy for" , cc )
592
+ continue
593
+ }
594
+ isAuthorized := c .PolicyParser .Parse (sp )
595
+ if isAuthorized == nil {
596
+ logger .Warning ("Failed obtaining filter for" , cc )
597
+ continue
598
+ }
599
+ if ! isAuthorized (peerAuthInfo ) {
600
+ logger .Debug ("Skipping" , cc , "because peer isn't authorized" )
601
+ continue
602
+ }
603
+ seqs2Namespaces .addCollection (seqInBlock , item .WriteSet .DataModel , ns .Namespace , col )
604
+ }
605
+ }
606
+ })
607
+ if err != nil {
608
+ return nil , nil , errors .WithStack (err )
529
609
}
530
610
531
- return blockAndPvtData .Block , blockPvtData , nil
532
- }
533
-
534
- // GetBlockByNum returns block by sequence number
535
- func (c * coordinator ) GetBlockByNum (seqNum uint64 ) (* common.Block , error ) {
536
- blocks := c .GetBlocks ([]uint64 {seqNum })
537
- if len (blocks ) == 0 {
538
- return nil , fmt .Errorf ("cannot retrieve block number %d" , seqNum )
539
- }
540
- return blocks [0 ], nil
611
+ return blockAndPvtData .Block , seqs2Namespaces .asPrivateData (), nil
541
612
}
0 commit comments