Skip to content

Commit d789375

Browse files
cendhudenyeart
authored andcommitted
recon: stateDB recovery after a peer crash
Once the pvtData of old blocks are committed to pvtStore, we need to check whether the stateDB needs to be updated. If so, we should commit the pvtData to the stateDB. As a peer can fail after committing pvtData of old blocks, we might end up with a stateDB which is out of sync from the pvtStore. Hence, this CR implements a recovery mechanism. FAB-11806 #done Change-Id: If6f8aa608080d7e02fb6c4236079814e3ed5c11f Signed-off-by: senthil <cendhu@gmail.com> Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 6a9a783 commit d789375

File tree

8 files changed

+163
-19
lines changed

8 files changed

+163
-19
lines changed

core/ledger/kvledger/kv_ledger.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ func (l *kvLedger) initBlockStore(btlPolicy pvtdatapolicy.BTLPolicy) {
9494
//by recommitting last valid blocks
9595
func (l *kvLedger) recoverDBs() error {
9696
logger.Debugf("Entering recoverDB()")
97+
if err := l.syncStateAndHistoryDBWithBlockstore(); err != nil {
98+
return err
99+
}
100+
if err := l.syncStateDBWithPvtdatastore(); err != nil {
101+
return err
102+
}
103+
return nil
104+
}
105+
106+
func (l *kvLedger) syncStateAndHistoryDBWithBlockstore() error {
97107
//If there is no block in blockstorage, nothing to recover.
98108
info, _ := l.blockStore.GetBlockchainInfo()
99109
if info.Height == 0 {
@@ -136,6 +146,29 @@ func (l *kvLedger) recoverDBs() error {
136146
recoverers[0].recoverable, recoverers[1].recoverable)
137147
}
138148

149+
func (l *kvLedger) syncStateDBWithPvtdatastore() error {
150+
// TODO: So far, the design philosophy was that the scope of block storage is
151+
// limited to storing and retrieving blocks data with certain guarantees and statedb is
152+
// for the state management. The higher layer, 'kvledger', coordinates the acts between
153+
// the two. However, with maintaining the state of the consumption of blocks (i.e,
154+
// lastUpdatedOldBlockList for pvtstore reconciliation) within private data block storage
155+
// breaks that assumption. The knowledge of what blocks have been consumed for the purpose
156+
// of state update should not lie with the source (i.e., pvtdatastorage). A potential fix
157+
// is mentioned in FAB-12731
158+
blocksPvtData, err := l.blockStore.GetLastUpdatedOldBlocksPvtData()
159+
if err != nil {
160+
return err
161+
}
162+
if err := l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData); err != nil {
163+
return err
164+
}
165+
if err := l.blockStore.ResetLastUpdatedOldBlocksList(); err != nil {
166+
return err
167+
}
168+
169+
return nil
170+
}
171+
139172
//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
140173
//state DB or history DB or both
141174
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {

core/ledger/kvledger/kv_ledger_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hyperledger/fabric/common/ledger/testutil"
1616
"github.com/hyperledger/fabric/common/util"
1717
lgr "github.com/hyperledger/fabric/core/ledger"
18+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
1819
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
1920
ledgertestutil "github.com/hyperledger/fabric/core/ledger/testutil"
2021
"github.com/hyperledger/fabric/protos/common"
@@ -185,6 +186,11 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) {
185186
}
186187

187188
func TestKVLedgerDBRecovery(t *testing.T) {
189+
testSyncStateAndHistoryDBWithBlockstore(t)
190+
testSyncStateDBWithPvtdatastore(t)
191+
}
192+
193+
func testSyncStateAndHistoryDBWithBlockstore(t *testing.T) {
188194
env := newTestEnv(t)
189195
defer env.cleanup()
190196
provider := testutilNewProviderWithCollectionConfig(t,
@@ -366,6 +372,67 @@ func TestKVLedgerDBRecovery(t *testing.T) {
366372
)
367373
}
368374

375+
func testSyncStateDBWithPvtdatastore(t *testing.T) {
376+
env := newTestEnv(t)
377+
defer env.cleanup()
378+
provider := testutilNewProviderWithCollectionConfig(t,
379+
"ns", map[string]uint64{"coll": 0},
380+
)
381+
defer provider.Close()
382+
testLedgerid := "testLedger"
383+
bg, gb := testutil.NewBlockGenerator(t, testLedgerid, false)
384+
ledger, _ := provider.Create(gb)
385+
defer ledger.Close()
386+
387+
// create and commit two data block (both with missing pvtdata)
388+
blockAndPvtdata1, pvtdata1 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk1",
389+
map[string]string{"key1": "value1.1", "key2": "value2.1", "key3": "value3.1"},
390+
map[string]string{"key1": "pvtValue1.1", "key2": "pvtValue2.1", "key3": "pvtValue3.1"})
391+
392+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata1))
393+
394+
blockAndPvtdata2, pvtdata2 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk2",
395+
map[string]string{"key1": "value1.2", "key2": "value2.2", "key3": "value3.2"},
396+
map[string]string{"key1": "pvtValue1.2", "key2": "pvtValue2.2", "key3": "pvtValue3.2"})
397+
398+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata2))
399+
400+
txSim, err := ledger.NewTxSimulator("test")
401+
assert.NoError(t, err)
402+
value, err := txSim.GetPrivateData("ns", "coll", "key1")
403+
_, ok := err.(*txmgr.ErrPvtdataNotAvailable)
404+
assert.True(t, ok)
405+
assert.Nil(t, value)
406+
407+
blocksPvtData := map[uint64][]*lgr.TxPvtData{
408+
1: {
409+
pvtdata1,
410+
},
411+
2: {
412+
pvtdata2,
413+
},
414+
}
415+
416+
assert.NoError(t, ledger.(*kvLedger).blockStore.CommitPvtDataOfOldBlocks(blocksPvtData))
417+
418+
// Now, assume that peer fails here before committing the pvtData to stateDB
419+
ledger.Close()
420+
provider.Close()
421+
422+
// Here the peer comes online and calls NewKVLedger to get a handler for the ledger
423+
// StateDB and HistoryDB should be recovered before returning from NewKVLedger call
424+
provider = testutilNewProviderWithCollectionConfig(t,
425+
"ns", map[string]uint64{"coll": 0},
426+
)
427+
ledger, _ = provider.Open(testLedgerid)
428+
429+
txSim, err = ledger.NewTxSimulator("test")
430+
assert.NoError(t, err)
431+
value, err = txSim.GetPrivateData("ns", "coll", "key1")
432+
assert.NoError(t, err)
433+
assert.Equal(t, value, []byte("pvtValue1.2"))
434+
}
435+
369436
func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
370437

371438
//call a helper method to load the core.yaml
@@ -482,6 +549,21 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
482549
}
483550
}
484551

552+
func prepareNextBlockWithMissingPvtDataForTest(t *testing.T, l lgr.PeerLedger, bg *testutil.BlockGenerator,
553+
txid string, pubKVs map[string]string, pvtKVs map[string]string) (*lgr.BlockAndPvtData, *lgr.TxPvtData) {
554+
555+
blockAndPvtData := prepareNextBlockForTest(t, l, bg, txid, pubKVs, pvtKVs)
556+
557+
blkMissingDataInfo := make(lgr.TxMissingPvtDataMap)
558+
blkMissingDataInfo.Add(0, "ns", "coll", true)
559+
blockAndPvtData.MissingPvtData = blkMissingDataInfo
560+
561+
pvtData := blockAndPvtData.PvtData[0]
562+
delete(blockAndPvtData.PvtData, 0)
563+
564+
return blockAndPvtData, pvtData
565+
}
566+
485567
func prepareNextBlockForTest(t *testing.T, l lgr.PeerLedger, bg *testutil.BlockGenerator,
486568
txid string, pubKVs map[string]string, pvtKVs map[string]string) *lgr.BlockAndPvtData {
487569
simulator, _ := l.NewTxSimulator(txid)

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
138138
// a given <ns, coll, key>, we need to find duplicate tuples with different
139139
// versions and use the one with the higher version
140140
uniquePvtData, err := constructUniquePvtData(blocksPvtData)
141-
if err != nil {
141+
if len(uniquePvtData) == 0 || err != nil {
142142
return err
143143
}
144144

core/ledger/ledger_interface.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ type BlockAndPvtData struct {
253253
// BlockPvtData contains the private data for a block
254254
type BlockPvtData struct {
255255
BlockNum uint64
256-
WriteSets map[uint64]*TxPvtData
256+
WriteSets TxPvtDataMap
257257
}
258258

259259
// Add adds a given missing private data in the MissingPrivateDataList

core/ledger/ledgerstorage/store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,9 @@ func (s *Store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap m
231231
return s.pvtdataStore.ProcessCollsEligibilityEnabled(committingBlk, nsCollMap)
232232
}
233233

234-
// GetLastUpdatedOldBlocksList invokes the function on underlying pvtdata store
235-
func (s *Store) GetLastUpdatedOldBlocksList() ([]uint64, error) {
236-
return s.pvtdataStore.GetLastUpdatedOldBlocksList()
234+
// GetLastUpdatedOldBlocksPvtData invokes the function on underlying pvtdata store
235+
func (s *Store) GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error) {
236+
return s.pvtdataStore.GetLastUpdatedOldBlocksPvtData()
237237
}
238238

239239
// ResetLastUpdatedOldBlocksList invokes the function on underlying pvtdata store

core/ledger/pvtdatastorage/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ type Store interface {
7272
// these pvtData, the `lastUpdatedOldBlocksList` must be removed. During the peer startup,
7373
// if the `lastUpdatedOldBlocksList` exists, stateDB needs to be updated with the appropriate pvtData.
7474
CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error
75-
// GetLastUpdatedOldBlocksList returns the value of `lastUpdatedOldBlocksList`
76-
GetLastUpdatedOldBlocksList() ([]uint64, error)
75+
// GetLastUpdatedOldBlocksPvtData returns the pvtdata of blocks listed in `lastUpdatedOldBlocksList`
76+
GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error)
7777
// ResetLastUpdatedOldBlocksList removes the `lastUpdatedOldBlocksList` entry from the store
7878
ResetLastUpdatedOldBlocksList() error
7979
// IsEmpty returns true if the store does not have any block committed yet

core/ledger/pvtdatastorage/store_impl.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (s *store) initState() error {
147147
if s.batchPending, err = s.hasPendingCommit(); err != nil {
148148
return err
149149
}
150-
if blist, err = s.GetLastUpdatedOldBlocksList(); err != nil {
150+
if blist, err = s.getLastUpdatedOldBlocksList(); err != nil {
151151
return err
152152
}
153153
if len(blist) > 0 {
@@ -521,7 +521,27 @@ func (s *store) commitBatch(batch *leveldbhelper.UpdateBatch) error {
521521
return nil
522522
}
523523

524-
func (s *store) GetLastUpdatedOldBlocksList() ([]uint64, error) {
524+
// GetLastUpdatedOldBlocksPvtData implements the function in the interface `Store`
525+
func (s *store) GetLastUpdatedOldBlocksPvtData() (map[uint64][]*ledger.TxPvtData, error) {
526+
if !s.isLastUpdatedOldBlocksSet {
527+
return nil, nil
528+
}
529+
530+
updatedBlksList, err := s.getLastUpdatedOldBlocksList()
531+
if err != nil {
532+
return nil, err
533+
}
534+
535+
blksPvtData := make(map[uint64][]*ledger.TxPvtData)
536+
for _, blkNum := range updatedBlksList {
537+
if blksPvtData[blkNum], err = s.GetPvtDataByBlockNum(blkNum, nil); err != nil {
538+
return nil, err
539+
}
540+
}
541+
return blksPvtData, nil
542+
}
543+
544+
func (s *store) getLastUpdatedOldBlocksList() ([]uint64, error) {
525545
var v []byte
526546
var err error
527547
if v, err = s.db.Get(lastUpdatedOldBlocksKey); err != nil {
@@ -547,10 +567,8 @@ func (s *store) GetLastUpdatedOldBlocksList() ([]uint64, error) {
547567
return updatedBlksList, nil
548568
}
549569

570+
// ResetLastUpdatedOldBlocksList implements the function in the interface `Store`
550571
func (s *store) ResetLastUpdatedOldBlocksList() error {
551-
if !s.isLastUpdatedOldBlocksSet {
552-
return &ErrIllegalCall{"No updated old block list"}
553-
}
554572
batch := leveldbhelper.NewUpdateBatch()
555573
batch.Delete(lastUpdatedOldBlocksKey)
556574
if err := s.db.WriteBatch(batch, true); err != nil {
@@ -712,6 +730,7 @@ func (s *store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M
712730
return missingPvtDataInfo, nil
713731
}
714732

733+
// ProcessCollsEligibilityEnabled implements the function in the interface `Store`
715734
func (s *store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap map[string][]string) error {
716735
key := encodeCollElgKey(committingBlk)
717736
m := newCollElgInfo(nsCollMap)

core/ledger/pvtdatastorage/store_impl_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestCommitPvtDataOfOldBlocks(t *testing.T) {
246246
assert.NoError(err)
247247
assert.Equal(expectedMissingPvtDataInfo, missingPvtDataInfo)
248248

249-
// COMMIT THE MISSINGDATA IN BLOCK 1
249+
// COMMIT THE MISSINGDATA IN BLOCK 1 AND BLOCK 2
250250
oldBlocksPvtData := make(map[uint64][]*ledger.TxPvtData)
251251
oldBlocksPvtData[1] = []*ledger.TxPvtData{
252252
produceSamplePvtdata(t, 1, []string{"ns-1:coll-1", "ns-2:coll-1"}),
@@ -298,18 +298,28 @@ func TestCommitPvtDataOfOldBlocks(t *testing.T) {
298298
assert.NoError(err)
299299
assert.Equal(expectedMissingPvtDataInfo, missingPvtDataInfo)
300300

301-
expectedBlockList := []uint64{1, 2}
302-
blockList, err := store.GetLastUpdatedOldBlocksList()
301+
// blksPvtData returns all the pvt data for a block for which the any pvtdata has been submitted
302+
// using CommitPvtDataOfOldBlocks
303+
blksPvtData, err := store.GetLastUpdatedOldBlocksPvtData()
303304
assert.NoError(err)
304-
assert.Equal(expectedBlockList, blockList)
305+
306+
expectedLastupdatedPvtdata := make(map[uint64][]*ledger.TxPvtData)
307+
expectedLastupdatedPvtdata[1] = []*ledger.TxPvtData{
308+
produceSamplePvtdata(t, 1, []string{"ns-1:coll-1", "ns-2:coll-1"}),
309+
produceSamplePvtdata(t, 2, []string{"ns-1:coll-1", "ns-2:coll-1", "ns-2:coll-2", "ns-3:coll-1"}),
310+
produceSamplePvtdata(t, 4, []string{"ns-1:coll-1", "ns-1:coll-2", "ns-2:coll-1", "ns-2:coll-2"}),
311+
}
312+
expectedLastupdatedPvtdata[2] = []*ledger.TxPvtData{
313+
produceSamplePvtdata(t, 3, []string{"ns-1:coll-1"}),
314+
}
315+
assert.Equal(expectedLastupdatedPvtdata, blksPvtData)
305316

306317
err = store.ResetLastUpdatedOldBlocksList()
307318
assert.NoError(err)
308319

309-
expectedBlockList = []uint64(nil)
310-
blockList, err = store.GetLastUpdatedOldBlocksList()
320+
blksPvtData, err = store.GetLastUpdatedOldBlocksPvtData()
311321
assert.NoError(err)
312-
assert.Nil(blockList)
322+
assert.Nil(blksPvtData)
313323

314324
// COMMIT BLOCK 3 WITH NO PVTDATA
315325
assert.NoError(store.Prepare(3, nil, nil))

0 commit comments

Comments
 (0)