Skip to content

Commit bce2b2b

Browse files
committed
[FAB-10576]: Check if puller got valid pvt rwset
While puller retrieves missing pvt data, whenever it's from ledger or from transient store there is might be case it has failed due to some error. In that case we need to add proper log message to the body of the puller createResponse function and skip to the next digest. This commit add such check for rwSets is not nil to prevent peer from crashing with panic. Change-Id: Ie6bb6e85520eac6992872da862d24c30dc2ffe34 Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent f5c552b commit bce2b2b

File tree

4 files changed

+61
-52
lines changed

4 files changed

+61
-52
lines changed

gossip/privdata/dataretriever.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0
77
package privdata
88

99
import (
10+
"errors"
11+
"fmt"
12+
1013
"github.com/hyperledger/fabric/core/ledger"
1114
"github.com/hyperledger/fabric/core/transientstore"
1215
"github.com/hyperledger/fabric/gossip/util"
@@ -19,7 +22,7 @@ import (
1922
type StorageDataRetriever interface {
2023
// CollectionRWSet retrieves for give digest relevant private data if
2124
// available otherwise returns nil
22-
CollectionRWSet(dig *gossip2.PvtDataDigest) *util.PrivateRWSetWithConfig
25+
CollectionRWSet(dig *gossip2.PvtDataDigest) (*util.PrivateRWSetWithConfig, error)
2326
}
2427

2528
// DataStore defines set of APIs need to get private data
@@ -53,7 +56,7 @@ func NewDataRetriever(store DataStore) StorageDataRetriever {
5356

5457
// CollectionRWSet retrieves for give digest relevant private data if
5558
// available otherwise returns nil
56-
func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) *util.PrivateRWSetWithConfig {
59+
func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) (*util.PrivateRWSetWithConfig, error) {
5760
filter := map[string]ledger.PvtCollFilter{
5861
dig.Namespace: map[string]bool{
5962
dig.Collection: true,
@@ -63,9 +66,8 @@ func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) *util.Priva
6366
height, err := dr.store.LedgerHeight()
6467
if err != nil {
6568
// if there is an error getting info from the ledger, we need to try to read from transient store
66-
logger.Error("Wasn't able to read ledger height, due to", err, "trying to lookup "+
67-
"private data from transient store, namespace", dig.Namespace, "collection name", dig.Collection, "txID", dig.TxId)
68-
return nil
69+
return nil, errors.New(fmt.Sprint("Wasn't able to read ledger height, due to", err, "trying to lookup "+
70+
"private data from transient store, namespace", dig.Namespace, "collection name", dig.Collection, "txID", dig.TxId))
6971
}
7072
if height <= dig.BlockSeq {
7173
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
@@ -79,13 +81,12 @@ func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) *util.Priva
7981
return dr.fromLedger(dig, filter)
8082
}
8183

82-
func (dr *dataRetriever) fromLedger(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) *util.PrivateRWSetWithConfig {
84+
func (dr *dataRetriever) fromLedger(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) (*util.PrivateRWSetWithConfig, error) {
8385
results := &util.PrivateRWSetWithConfig{}
8486
pvtData, err := dr.store.GetPvtDataByNum(dig.BlockSeq, filter)
8587
if err != nil {
86-
logger.Error("Wasn't able to obtain private data for collection", dig.Collection,
87-
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err)
88-
return nil
88+
return nil, errors.New(fmt.Sprint("wasn't able to obtain private data for collection", dig.Collection,
89+
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err))
8990
}
9091

9192
for _, data := range pvtData {
@@ -99,53 +100,47 @@ func (dr *dataRetriever) fromLedger(dig *gossip2.PvtDataDigest, filter map[strin
99100

100101
confHistoryRetriever, err := dr.store.GetConfigHistoryRetriever()
101102
if err != nil {
102-
logger.Error("Cannot obtain configuration history retriever, for collection,", dig.Collection,
103-
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err)
104-
return nil
103+
return nil, errors.New(fmt.Sprint("cannot obtain configuration history retriever, for collection,", dig.Collection,
104+
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err))
105105
}
106106

107107
configInfo, err := confHistoryRetriever.MostRecentCollectionConfigBelow(dig.BlockSeq, dig.Namespace)
108108
if err != nil {
109-
logger.Error("Cannot find recent collection config update below block sequence = ", dig.BlockSeq,
110-
"collection name =", dig.Collection, "for chaincode", dig.Namespace)
111-
return nil
109+
return nil, errors.New(fmt.Sprint("cannot find recent collection config update below block sequence = ", dig.BlockSeq,
110+
"collection name =", dig.Collection, "for chaincode", dig.Namespace))
112111
}
113112

114113
if configInfo == nil {
115-
logger.Error("No collection config update below block sequence = ", dig.BlockSeq,
116-
"collection name =", dig.Collection, "for chaincode", dig.Namespace, "is available")
117-
return nil
114+
return nil, errors.New(fmt.Sprint("no collection config update below block sequence = ", dig.BlockSeq,
115+
"collection name =", dig.Collection, "for chaincode", dig.Namespace, "is available"))
118116
}
119117
configs := dr.extractCollectionConfigs(configInfo.CollectionConfig, dig)
120118
if configs == nil {
121-
logger.Error("No collection config was found for collection", dig.Collection,
122-
"namespace", dig.Namespace, "txID", dig.TxId)
123-
return nil
119+
return nil, errors.New(fmt.Sprint("no collection config was found for collection", dig.Collection,
120+
"namespace", dig.Namespace, "txID", dig.TxId))
124121
}
125122
results.CollectionConfig = configs
126-
return results
123+
return results, nil
127124
}
128125

129-
func (dr *dataRetriever) fromTransientStore(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) *util.PrivateRWSetWithConfig {
126+
func (dr *dataRetriever) fromTransientStore(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) (*util.PrivateRWSetWithConfig, error) {
130127
results := &util.PrivateRWSetWithConfig{}
131128
it, err := dr.store.GetTxPvtRWSetByTxid(dig.TxId, filter)
132129
if err != nil {
133-
logger.Error("Was not able to retrieve private data from transient store, namespace", dig.Namespace,
134-
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err)
135-
return nil
130+
return nil, errors.New(fmt.Sprint("was not able to retrieve private data from transient store, namespace", dig.Namespace,
131+
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err))
136132
}
137133
defer it.Close()
138134

139135
maxEndorsedAt := uint64(0)
140136
for {
141137
res, err := it.NextWithConfig()
142138
if err != nil {
143-
logger.Error("Error getting next element out of private data iterator, namespace", dig.Namespace,
144-
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err)
145-
return nil
139+
return nil, errors.New(fmt.Sprint("error getting next element out of private data iterator, namespace", dig.Namespace,
140+
", collection name", dig.Collection, ", txID", dig.TxId, ", due to", err))
146141
}
147142
if res == nil {
148-
return results
143+
return results, nil
149144
}
150145
rws := res.PvtSimulationResultsWithConfig
151146
if rws == nil {

gossip/privdata/dataretriever_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
152152

153153
// Request digest for private data which is greater than current ledger height
154154
// to make it query transient store for missed private data
155-
rwSets := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
155+
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
156156
Namespace: namespace,
157157
Collection: collectionName,
158158
BlockSeq: 2,
@@ -161,6 +161,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
161161
})
162162

163163
assertion := assert.New(t)
164+
assertion.NoError(err)
164165
assertion.NotNil(rwSets)
165166
assertion.NotEmpty(rwSets.RWSet)
166167
assertion.Equal(2, len(rwSets.RWSet))
@@ -231,7 +232,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
231232

232233
// Request digest for private data which is greater than current ledger height
233234
// to make it query ledger for missed private data
234-
rwSets := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
235+
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
235236
Namespace: namespace,
236237
Collection: collectionName,
237238
BlockSeq: uint64(5),
@@ -240,6 +241,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
240241
})
241242

242243
assertion := assert.New(t)
244+
assertion.NoError(err)
243245
assertion.NotNil(rwSets)
244246
assertion.NotEmpty(rwSets)
245247
assertion.Equal(2, len(rwSets.RWSet))
@@ -267,7 +269,7 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
267269

268270
// Request digest for private data which is greater than current ledger height
269271
// to make it query transient store for missed private data
270-
rwSets := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
272+
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
271273
Namespace: namespace,
272274
Collection: collectionName,
273275
BlockSeq: uint64(5),
@@ -276,6 +278,7 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
276278
})
277279

278280
assertion := assert.New(t)
281+
assertion.Error(err)
279282
assertion.Nil(rwSets)
280283
}
281284

@@ -345,7 +348,7 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
345348

346349
// Request digest for private data which is greater than current ledger height
347350
// to make it query transient store for missed private data
348-
rwSets := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
351+
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
349352
Namespace: namespace,
350353
Collection: collectionName,
351354
BlockSeq: uint64(5),
@@ -354,6 +357,7 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
354357
})
355358

356359
assertion := assert.New(t)
360+
assertion.NoError(err)
357361
assertion.NotNil(rwSets)
358362
assertion.NotEmpty(rwSets)
359363
assertion.Equal(2, len(rwSets.RWSet))

gossip/privdata/pull.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const (
3939
// of retrieving required private data
4040
type PrivateDataRetriever interface {
4141
// CollectionRWSet returns the bytes of CollectionPvtReadWriteSet for a given txID and collection from the transient store
42-
CollectionRWSet(dig *proto.PvtDataDigest) *util.PrivateRWSetWithConfig
42+
CollectionRWSet(dig *proto.PvtDataDigest) (*util.PrivateRWSetWithConfig, error)
4343
}
4444

4545
// gossip defines capabilities that the gossip module gives the Coordinator
@@ -140,7 +140,17 @@ func (p *puller) createResponse(message proto.ReceivedMessage) []*proto.PvtDataE
140140
}()
141141
msg := message.GetGossipMessage()
142142
for _, dig := range msg.GetPrivateReq().Digests {
143-
rwSets := p.CollectionRWSet(dig)
143+
rwSets, err := p.CollectionRWSet(dig)
144+
if err != nil {
145+
logger.Errorf("Wasn't able to get private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s], due to [%s]",
146+
p.channel, dig.Namespace, dig.Collection, dig.TxId, err)
147+
continue
148+
}
149+
if rwSets == nil {
150+
logger.Errorf("No private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s] is available, skipping...",
151+
p.channel, dig.Namespace, dig.Collection, dig.TxId)
152+
continue
153+
}
144154
logger.Debug("Found", len(rwSets.RWSet), "for TxID", dig.TxId, ", collection", dig.Collection, "for", message.GetConnectionInfo().Endpoint)
145155
if len(rwSets.RWSet) == 0 {
146156
continue

gossip/privdata/pull_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ type dataRetrieverMock struct {
112112
mock.Mock
113113
}
114114

115-
func (dr *dataRetrieverMock) CollectionRWSet(dig *proto.PvtDataDigest) *util.PrivateRWSetWithConfig {
116-
return dr.Called(dig).Get(0).(*util.PrivateRWSetWithConfig)
115+
func (dr *dataRetrieverMock) CollectionRWSet(dig *proto.PvtDataDigest) (*util.PrivateRWSetWithConfig, error) {
116+
return dr.Called(dig).Get(0).(*util.PrivateRWSetWithConfig), dr.Called(dig).Error(1)
117117
}
118118

119119
type receivedMsg struct {
@@ -282,7 +282,7 @@ func TestPullerFromOnly1Peer(t *testing.T) {
282282
Collection: "col1",
283283
Namespace: "ns1",
284284
}
285-
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(p2TransientStore)
285+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(p2TransientStore, nil)
286286

287287
factoryMock3 := &collectionAccessFactoryMock{}
288288
policyMock3 := &collectionAccessPolicyMock{}
@@ -327,7 +327,7 @@ func TestPullerDataNotAvailable(t *testing.T) {
327327

328328
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(&util.PrivateRWSetWithConfig{
329329
RWSet: []util.PrivateRWSet{},
330-
})
330+
}, nil)
331331

332332
p3 := gn.newPuller("p3", newCollectionStore(), factoryMock)
333333
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Run(func(_ mock.Arguments) {
@@ -415,7 +415,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
415415
},
416416
},
417417
},
418-
})
418+
}, nil)
419419

420420
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p3")
421421
factoryMock3 := &collectionAccessFactoryMock{}
@@ -435,7 +435,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
435435
},
436436
},
437437
},
438-
})
438+
}, nil)
439439
dasf := &digestsAndSourceFactory{}
440440
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
441441
fetchedMessages, err := p1.fetch(d2s, uint64(1))
@@ -484,7 +484,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
484484
Namespace: "ns1",
485485
}
486486

487-
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p2TransientStore)
487+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p2TransientStore, nil)
488488

489489
p3TransientStore := &util.PrivateRWSetWithConfig{
490490
RWSet: newPRWSet(),
@@ -512,7 +512,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
512512
Namespace: "ns1",
513513
}
514514

515-
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p3TransientStore)
515+
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p3TransientStore, nil)
516516

517517
dasf := &digestsAndSourceFactory{}
518518
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig1).toSources().mapDigest(dig2).toSources().create(), uint64(1))
@@ -576,7 +576,7 @@ func TestPullerRetries(t *testing.T) {
576576
factoryMock2.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock2, nil)
577577

578578
p2 := gn.newPuller("p2", policyStore, factoryMock2)
579-
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore)
579+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore, nil)
580580

581581
// p3
582582
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p1")
@@ -588,7 +588,7 @@ func TestPullerRetries(t *testing.T) {
588588
factoryMock3.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock3, nil)
589589

590590
p3 := gn.newPuller("p3", policyStore, factoryMock3)
591-
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore)
591+
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore, nil)
592592

593593
// p4
594594
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p4")
@@ -600,7 +600,7 @@ func TestPullerRetries(t *testing.T) {
600600
factoryMock4.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock4, nil)
601601

602602
p4 := gn.newPuller("p4", policyStore, factoryMock4)
603-
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore)
603+
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore, nil)
604604

605605
// p5
606606
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p5")
@@ -612,7 +612,7 @@ func TestPullerRetries(t *testing.T) {
612612
factoryMock5.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock5, nil)
613613

614614
p5 := gn.newPuller("p5", policyStore, factoryMock5)
615-
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore)
615+
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig).Return(transientStore, nil)
616616

617617
// Fetch from someone
618618
dasf := &digestsAndSourceFactory{}
@@ -688,11 +688,11 @@ func TestPullerPreferEndorsers(t *testing.T) {
688688

689689
// We only define an action for dig2 on p2, and the test would fail with panic if any other peer is asked for
690690
// a private RWSet on dig2
691-
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p2TransientStore)
691+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig2).Return(p2TransientStore, nil)
692692

693693
// We only define an action for dig1 on p3, and the test would fail with panic if any other peer is asked for
694694
// a private RWSet on dig1
695-
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p3TransientStore)
695+
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(p3TransientStore, nil)
696696

697697
dasf := &digestsAndSourceFactory{}
698698
d2s := dasf.mapDigest(dig1).toSources("p3").mapDigest(dig2).toSources().create()
@@ -752,9 +752,9 @@ func TestPullerAvoidPullingPurgedData(t *testing.T) {
752752
Namespace: "ns1",
753753
}
754754

755-
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(transientStore)
755+
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(transientStore, nil)
756756

757-
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(transientStore).
757+
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", dig1).Return(transientStore, nil).
758758
Run(
759759
func(mock.Arguments) {
760760
assert.Fail(t, "we should not fetch private data from peers where it was purged")

0 commit comments

Comments
 (0)