Skip to content

Commit 887da22

Browse files
committed
[FAB-6287] filter out private data in state transfer
This change set makes the coordinator filter out private data the requesting peer receives along with the block. It also removes GetBlocks() from all the APIs all the way to the committer Since the method that is used now instead is GetPvtDataAndBlockByNum Change-Id: Ie2d33abdd10f5e715db775bf3e599e8419ddddfe Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 84e14de commit 887da22

File tree

6 files changed

+171
-113
lines changed

6 files changed

+171
-113
lines changed

core/ledger/ledger_interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ type TxPvtData struct {
174174
WriteSet *rwset.TxPvtReadWriteSet
175175
}
176176

177-
// BlockAndPvtData encapsultes the block and a map that contains the tuples <seqInBlock, *TxPvtData>
177+
// BlockAndPvtData encapsulates the block and a map that contains the tuples <seqInBlock, *TxPvtData>
178178
// The map is expected to contain the entries only for the transactions that has associated pvt data
179179
type BlockAndPvtData struct {
180180
Block *common.Block

gossip/privdata/coordinator.go

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@ type Coordinator interface {
5757
// the order of private data in slice of PvtDataCollections doesn't implies the order of
5858
// transactions in the block related to these private data, to get the correct placement
5959
// need to read TxPvtData.SeqInBlock field
60-
GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, util.PvtDataCollections, error)
61-
62-
// GetBlockByNum returns block and related to the block private data
63-
GetBlockByNum(seqNum uint64) (*common.Block, error)
60+
GetPvtDataAndBlockByNum(seqNum uint64, peerAuth common.SignedData) (*common.Block, util.PvtDataCollections, error)
6461

6562
// Get recent block sequence number
6663
LedgerHeight() (uint64, error)
@@ -404,35 +401,27 @@ func (k *rwSetKey) toTxPvtReadWriteSet(rws []byte) *rwset.TxPvtReadWriteSet {
404401
}
405402
}
406403

407-
func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, error) {
408-
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
409-
return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
410-
}
411-
txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
412-
if len(txsFilter) != len(block.Data.Data) {
413-
return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))
414-
}
404+
type blockData [][]byte
415405

416-
privateRWsetsInBlock := make(map[rwSetKey]struct{})
417-
missing := make(rwSetKeysByTxIDs)
418-
for seqInBlock, envBytes := range block.Data.Data {
406+
func (data blockData) forEach(txsFilter txValidationFlags, consumer func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet)) error {
407+
for seqInBlock, envBytes := range data {
419408
if txsFilter[seqInBlock] != uint8(peer.TxValidationCode_VALID) {
420409
logger.Debug("Skipping Tx", seqInBlock, "because it's invalid. Status is", txsFilter[seqInBlock])
421410
continue
422411
}
423412
env, err := utils.GetEnvelopeFromBlock(envBytes)
424413
if err != nil {
425-
return nil, err
414+
return err
426415
}
427416

428417
payload, err := utils.GetPayload(env)
429418
if err != nil {
430-
return nil, err
419+
return err
431420
}
432421

433422
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
434423
if err != nil {
435-
return nil, err
424+
return err
436425
}
437426

438427
if chdr.Type != int32(common.HeaderType_ENDORSER_TRANSACTION) {
@@ -450,15 +439,32 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
450439
logger.Warning("Failed obtaining TxRwSet from ChaincodeAction's results", err)
451440
continue
452441
}
442+
consumer(uint64(seqInBlock), chdr, txRWSet)
443+
}
444+
return nil
445+
}
453446

447+
func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (rwSetKeysByTxIDs, error) {
448+
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
449+
return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
450+
}
451+
txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
452+
if len(txsFilter) != len(block.Data.Data) {
453+
return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))
454+
}
455+
456+
privateRWsetsInBlock := make(map[rwSetKey]struct{})
457+
missing := make(rwSetKeysByTxIDs)
458+
data := blockData(block.Data.Data)
459+
err := data.forEach(txsFilter, func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) {
454460
for _, ns := range txRWSet.NsRwSets {
455461
for _, hashed := range ns.CollHashedRwSets {
456462
if !c.isEligible(chdr, ns.NameSpace, hashed.CollectionName) {
457463
continue
458464
}
459465
key := rwSetKey{
460466
txID: chdr.TxId,
461-
seqInBlock: uint64(seqInBlock),
467+
seqInBlock: seqInBlock,
462468
hash: hex.EncodeToString(hashed.PvtRwSetHash),
463469
namespace: ns.NameSpace,
464470
collection: hashed.CollectionName,
@@ -467,14 +473,16 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
467473
if _, exists := ownedRWsets[key]; !exists {
468474
txAndSeq := txAndSeqInBlock{
469475
txID: chdr.TxId,
470-
seqInBlock: uint64(seqInBlock),
476+
seqInBlock: seqInBlock,
471477
}
472478
missing[txAndSeq] = append(missing[txAndSeq], key)
473479
}
474480
} // for all hashed RW sets
475481
} // for all RW sets
476-
} // for all transactions in block
477-
482+
})
483+
if err != nil {
484+
return nil, errors.WithStack(err)
485+
}
478486
// In the end, iterate over the ownedRWsets, and if the key doesn't exist in
479487
// the privateRWsetsInBlock - delete it from the ownedRWsets
480488
for k := range ownedRWsets {
@@ -512,30 +520,93 @@ func (c *coordinator) isEligible(chdr *common.ChannelHeader, namespace string, c
512520
return eligible
513521
}
514522

523+
type seqAndDataModel struct {
524+
seq uint64
525+
dataModel rwset.TxReadWriteSet_DataModel
526+
}
527+
528+
// map from seqAndDataModel to:
529+
// maap from namespace to []*rwset.CollectionPvtReadWriteSet
530+
type aggregatedCollections map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet
531+
532+
func (ac aggregatedCollections) addCollection(seqInBlock uint64, dm rwset.TxReadWriteSet_DataModel, namespace string, col *rwset.CollectionPvtReadWriteSet) {
533+
seq := seqAndDataModel{
534+
dataModel: dm,
535+
seq: seqInBlock,
536+
}
537+
if _, exists := ac[seq]; !exists {
538+
ac[seq] = make(map[string][]*rwset.CollectionPvtReadWriteSet)
539+
}
540+
ac[seq][namespace] = append(ac[seq][namespace], col)
541+
}
542+
543+
func (ac aggregatedCollections) asPrivateData() []*ledger.TxPvtData {
544+
var data []*ledger.TxPvtData
545+
for seq, ns := range ac {
546+
txPrivateData := &ledger.TxPvtData{
547+
SeqInBlock: seq.seq,
548+
WriteSet: &rwset.TxPvtReadWriteSet{
549+
DataModel: seq.dataModel,
550+
},
551+
}
552+
for namespaceName, cols := range ns {
553+
txPrivateData.WriteSet.NsPvtRwset = append(txPrivateData.WriteSet.NsPvtRwset, &rwset.NsPvtReadWriteSet{
554+
Namespace: namespaceName,
555+
CollectionPvtRwset: cols,
556+
})
557+
}
558+
data = append(data, txPrivateData)
559+
}
560+
return data
561+
}
562+
515563
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
516564
// the order of private data in slice of PvtDataCollections doesn't implies the order of
517565
// transactions in the block related to these private data, to get the correct placement
518566
// need to read TxPvtData.SeqInBlock field
519-
func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64) (*common.Block, util.PvtDataCollections, error) {
567+
func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common.SignedData) (*common.Block, util.PvtDataCollections, error) {
520568
blockAndPvtData, err := c.Committer.GetPvtDataAndBlockByNum(seqNum)
521569
if err != nil {
522570
return nil, nil, fmt.Errorf("cannot retrieve block number %d, due to %s", seqNum, err)
523571
}
524572

525-
var blockPvtData util.PvtDataCollections
573+
seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet))
574+
data := blockData(blockAndPvtData.Block.Data.Data)
575+
err = data.forEach(make(txValidationFlags, len(data)), func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet) {
576+
item, exists := blockAndPvtData.BlockPvtData[seqInBlock]
577+
if !exists {
578+
return
579+
}
526580

527-
for _, item := range blockAndPvtData.BlockPvtData {
528-
blockPvtData = append(blockPvtData, item)
581+
for _, ns := range item.WriteSet.NsPvtRwset {
582+
for _, col := range ns.CollectionPvtRwset {
583+
cc := rwset.CollectionCriteria{
584+
Channel: chdr.ChannelId,
585+
TxId: chdr.TxId,
586+
Namespace: ns.Namespace,
587+
Collection: col.CollectionName,
588+
}
589+
sp := c.PolicyStore.CollectionPolicy(cc)
590+
if sp == nil {
591+
logger.Warning("Failed obtaining policy for", cc)
592+
continue
593+
}
594+
isAuthorized := c.PolicyParser.Parse(sp)
595+
if isAuthorized == nil {
596+
logger.Warning("Failed obtaining filter for", cc)
597+
continue
598+
}
599+
if !isAuthorized(peerAuthInfo) {
600+
logger.Debug("Skipping", cc, "because peer isn't authorized")
601+
continue
602+
}
603+
seqs2Namespaces.addCollection(seqInBlock, item.WriteSet.DataModel, ns.Namespace, col)
604+
}
605+
}
606+
})
607+
if err != nil {
608+
return nil, nil, errors.WithStack(err)
529609
}
530610

531-
return blockAndPvtData.Block, blockPvtData, nil
532-
}
533-
534-
// GetBlockByNum returns block by sequence number
535-
func (c *coordinator) GetBlockByNum(seqNum uint64) (*common.Block, error) {
536-
blocks := c.GetBlocks([]uint64{seqNum})
537-
if len(blocks) == 0 {
538-
return nil, fmt.Errorf("cannot retrieve block number %d", seqNum)
539-
}
540-
return blocks[0], nil
611+
return blockAndPvtData.Block, seqs2Namespaces.asPrivateData(), nil
541612
}

gossip/privdata/coordinator_test.go

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -222,19 +222,19 @@ func (f *fetcherMock) fetch(req *proto.RemotePvtDataRequest) ([]*proto.PvtDataEl
222222
return nil, args.Get(1).(error)
223223
}
224224

225-
func createPolicyStore(selfSignedData common.SignedData) *policyStore {
225+
func createPolicyStore(expectedSignedData common.SignedData) *policyStore {
226226
return &policyStore{
227-
selfSignedData: selfSignedData,
228-
policies: make(map[serializedPolicy]rwset.CollectionCriteria),
229-
store: make(map[rwset.CollectionCriteria]serializedPolicy),
227+
expectedSignedData: expectedSignedData,
228+
policies: make(map[serializedPolicy]rwset.CollectionCriteria),
229+
store: make(map[rwset.CollectionCriteria]serializedPolicy),
230230
}
231231
}
232232

233233
type policyStore struct {
234-
selfSignedData common.SignedData
235-
acceptsAll bool
236-
store map[rwset.CollectionCriteria]serializedPolicy
237-
policies map[serializedPolicy]rwset.CollectionCriteria
234+
expectedSignedData common.SignedData
235+
acceptsAll bool
236+
store map[rwset.CollectionCriteria]serializedPolicy
237+
policies map[serializedPolicy]rwset.CollectionCriteria
238238
}
239239

240240
func (ps *policyStore) thatAcceptsAll() *policyStore {
@@ -282,7 +282,7 @@ func (*policyParser) Parse(serializedPol privdata.SerializedPolicy) privdata.Fil
282282
sp := serializedPol.(*serializedPolicy)
283283
return func(sd common.SignedData) bool {
284284
that, _ := asn1.Marshal(sd)
285-
this, _ := asn1.Marshal(sp.ps.selfSignedData)
285+
this, _ := asn1.Marshal(sp.ps.expectedSignedData)
286286
if hex.EncodeToString(that) != hex.EncodeToString(this) {
287287
panic("Self signed data passed isn't equal to expected")
288288
}
@@ -810,56 +810,63 @@ func TestCoordinatorStoreBlock(t *testing.T) {
810810
}
811811

812812
func TestCoordinatorGetBlocks(t *testing.T) {
813+
sd := common.SignedData{
814+
Identity: []byte{0, 1, 2},
815+
Signature: []byte{3, 4, 5},
816+
Data: []byte{6, 7, 8},
817+
}
818+
ps := createPolicyStore(sd).thatAcceptsAll()
819+
pp := &policyParser{}
813820
committer := &committerMock{}
814821
store := &mockTransientStore{t: t}
815822
fetcher := &fetcherMock{t: t}
816823
coordinator := NewCoordinator(Support{
824+
PolicyStore: ps,
825+
PolicyParser: pp,
817826
Committer: committer,
818827
Fetcher: fetcher,
819828
TransientStore: store,
820829
Validator: &validatorMock{},
821-
}, common.SignedData{})
830+
}, sd)
822831

823-
// Bad path: block is not returned
824-
committer.On("GetBlocks", mock.Anything).Return([]*common.Block{})
825-
block, err := coordinator.GetBlockByNum(1)
826-
assert.Contains(t, err.Error(), "cannot retrieve block number 1")
827-
assert.Error(t, err)
828-
assert.Nil(t, block)
832+
hash := util2.ComputeSHA256([]byte("rws-pre-image"))
833+
bf := &blockFactory{
834+
channelID: "test",
835+
}
836+
block := bf.AddTxn("tx1", "ns1", hash, "c1", "c2").AddTxn("tx2", "ns2", hash, "c1").create()
829837

830-
// Green path: Block is returned
831-
committer.Mock = mock.Mock{}
832-
committer.On("GetBlocks", mock.Anything).Return([]*common.Block{
833-
{
834-
Header: &common.BlockHeader{
835-
Number: 1,
836-
},
837-
},
838+
// Green path - block and private data is returned, but the requester isn't eligible for all the private data,
839+
// but only to a subset of it.
840+
ps = createPolicyStore(sd).thatAccepts(rwset.CollectionCriteria{
841+
Namespace: "ns1",
842+
Collection: "c2",
843+
TxId: "tx1",
844+
Channel: "test",
838845
})
839-
block, err = coordinator.GetBlockByNum(1)
840-
assert.NoError(t, err)
841-
assert.Equal(t, uint64(1), block.Header.Number)
842-
846+
committer.Mock = mock.Mock{}
843847
committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(&ledger.BlockAndPvtData{
844-
Block: block,
845-
BlockPvtData: map[uint64]*ledger.TxPvtData{
846-
0: {
847-
SeqInBlock: 0,
848-
},
849-
},
848+
Block: block,
849+
BlockPvtData: expectedCommittedPrivateData1,
850850
}, nil)
851-
852-
// Green path - block and private data is returned
853-
block2, privData, err := coordinator.GetPvtDataAndBlockByNum(1)
851+
coordinator = NewCoordinator(Support{
852+
PolicyStore: ps,
853+
PolicyParser: pp,
854+
Committer: committer,
855+
Fetcher: fetcher,
856+
TransientStore: store,
857+
Validator: &validatorMock{},
858+
}, sd)
859+
expectedPrivData := (&pvtDataFactory{}).addRWSet().addNSRWSet("ns1", "c2").create()
860+
block2, returnedPrivateData, err := coordinator.GetPvtDataAndBlockByNum(1, sd)
854861
assert.NoError(t, err)
855862
assert.Equal(t, block, block2)
856-
assert.Equal(t, util.PvtDataCollections{{SeqInBlock: 0}}, privData)
863+
assert.Equal(t, expectedPrivData, []*ledger.TxPvtData(returnedPrivateData))
857864

858865
// Bad path - error occurs when trying to retrieve the block and private data
859866
committer.Mock = mock.Mock{}
860867
committer.On("GetPvtDataAndBlockByNum", mock.Anything).Return(nil, errors.New("uh oh"))
861-
block2, privData, err = coordinator.GetPvtDataAndBlockByNum(1)
868+
block2, returnedPrivateData, err = coordinator.GetPvtDataAndBlockByNum(1, sd)
862869
assert.Nil(t, block2)
863-
assert.Empty(t, privData)
870+
assert.Empty(t, returnedPrivateData)
864871
assert.Error(t, err)
865872
}

gossip/service/gossip_service.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ type GossipService interface {
5050
NewConfigEventer() ConfigProcessor
5151
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
5252
InitializeChannel(chainID string, endpoints []string, support Support)
53-
// GetBlock returns block for given chain
54-
GetBlock(chainID string, index uint64) *common.Block
5553
// AddPayload appends message payload to for given chain
5654
AddPayload(chainID string, payload *gproto.Payload) error
5755
}
@@ -308,13 +306,6 @@ func (g *gossipServiceImpl) configUpdated(config Config) {
308306
g.JoinChan(jcm, gossipCommon.ChainID(config.ChainID()))
309307
}
310308

311-
// GetBlock returns block for given chain
312-
func (g *gossipServiceImpl) GetBlock(chainID string, index uint64) *common.Block {
313-
g.lock.RLock()
314-
defer g.lock.RUnlock()
315-
return g.chains[chainID].GetBlock(index)
316-
}
317-
318309
// AddPayload appends message payload to for given chain
319310
func (g *gossipServiceImpl) AddPayload(chainID string, payload *gproto.Payload) error {
320311
g.lock.RLock()

0 commit comments

Comments
 (0)