Skip to content

Commit 863f989

Browse files
committed
recon: at a time, only 1 func. can use the Cache
Among ValidateAndPrepare(), PrepareExpiringKeys(), and RemoveStaleAndCommitPvtDataOfOldBlocks(), we can allow only one function to execute at a time. The reason is that each function calls LoadCommittedVersions() which would clear the existing entries in the transient buffer/cache and load new entries (such a transient buffer/cache is not applicable for the golevelDB). As a result, these three functions can interleave and nullify the optimization provided by the bulk read API. Once the ledger cache (FAB-103) is introduced and existing LoadCommittedVersions() is refactored to return a map, we can allow these three functions to execute parallely. This CR ensures that these functions are not executed parallely. Further, when we commit the old pvtdata, some of these pvtdata might be expiring at the next block commit. As the toPurgeList might have been already constructed by the last Commit() call, we need to update the toPurgeList with eliglble old pvtdata if it is expiring. Till now, we created new toPurgeList by calling PrepareExpiringKeys() from RemoveStaleAndCommitPvtDataOfOldBlocks(). This CR instead updates the existing toPurgeList rather can calling PrepareExpiringKeys(). This is because, we don't want to execute these functions in parallel. Also, it is optimal to update the existing expiring keys list rather than creating a new one especially when no new regular blocks are being committed for a longer duration and a lot of old pvtData are being committed. FAB-13328 #done FAB-13329 #done Change-Id: Ia715bd1a376c6ce19caa7af6f6510e0cf6f5a1dd Signed-off-by: senthil <cendhu@gmail.com>
1 parent cfdb44d commit 863f989

File tree

2 files changed

+94
-41
lines changed

2 files changed

+94
-41
lines changed

core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,50 @@ func (p *purgeMgr) UpdateBookkeepingForPvtDataOfOldBlocks(pvtUpdates *privacyena
108108
toUpdate.pvtdataKeys.addAll(toAdd.pvtdataKeys)
109109
updatedList = append(updatedList, toUpdate)
110110
}
111+
112+
// As the expiring keys list might have been constructed after the last
113+
// regular block commit, we need to update the list. This is because,
114+
// some of the old pvtData which are being committed might get expired
115+
// during the next regular block commit. As a result, the corresponding
116+
// hashedKey in the expiring keys list would be missing the pvtData.
117+
p.addMissingPvtDataToWorkingSet(pvtUpdateCompositeKeyMap)
118+
111119
return p.expKeeper.updateBookkeeping(updatedList, nil)
112120
}
113121

122+
func (p *purgeMgr) addMissingPvtDataToWorkingSet(pvtKeys privacyenabledstate.PvtdataCompositeKeyMap) {
123+
if p.workingset == nil || len(p.workingset.toPurge) == 0 {
124+
return
125+
}
126+
127+
for k := range pvtKeys {
128+
hashedCompositeKey := privacyenabledstate.HashedCompositeKey{
129+
Namespace: k.Namespace,
130+
CollectionName: k.CollectionName,
131+
KeyHash: string(util.ComputeStringHash(k.Key))}
132+
133+
toPurgeKey, ok := p.workingset.toPurge[hashedCompositeKey]
134+
if !ok {
135+
// corresponding hashedKey is not present in the
136+
// expiring keys list
137+
continue
138+
}
139+
140+
// if the purgeKeyOnly is set, it means that the version of the pvtKey
141+
// stored in the stateDB is older than the version of the hashedKey.
142+
// As a result, only the pvtKey needs to be purged (expiring block height
143+
// for the recent hashedKey would be higher). If the recent
144+
// pvtKey of the corresponding hashedKey is being committed, we need to
145+
// remove the purgeKeyOnly entries from the toPurgeList it is going to be
146+
// updated by the commit of missing pvtData
147+
if toPurgeKey.purgeKeyOnly {
148+
delete(p.workingset.toPurge, hashedCompositeKey)
149+
} else {
150+
toPurgeKey.key = k.Key
151+
}
152+
}
153+
}
154+
114155
// DeleteExpiredAndUpdateBookkeeping implements function in the interface 'PurgeMgr'
115156
func (p *purgeMgr) DeleteExpiredAndUpdateBookkeeping(
116157
pvtUpdates *privacyenabledstate.PvtUpdateBatch,

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,15 @@ var logger = flogging.MustGetLogger("lockbasedtxmgr")
3232
// LockBasedTxMgr a simple implementation of interface `txmgmt.TxMgr`.
3333
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
3434
type LockBasedTxMgr struct {
35-
ledgerid string
36-
db privacyenabledstate.DB
37-
pvtdataPurgeMgr *pvtdataPurgeMgr
38-
validator validator.Validator
39-
stateListeners []ledger.StateListener
40-
ccInfoProvider ledger.DeployedChaincodeInfoProvider
41-
commitRWLock sync.RWMutex
42-
oldBlockCommit sync.Mutex
43-
current *current
44-
lastCommittedBlockNum uint64
35+
ledgerid string
36+
db privacyenabledstate.DB
37+
pvtdataPurgeMgr *pvtdataPurgeMgr
38+
validator validator.Validator
39+
stateListeners []ledger.StateListener
40+
ccInfoProvider ledger.DeployedChaincodeInfoProvider
41+
commitRWLock sync.RWMutex
42+
oldBlockCommit sync.Mutex
43+
current *current
4544
}
4645

4746
type current struct {
@@ -105,9 +104,22 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, er
105104
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) (
106105
[]*txmgr.TxStatInfo, error,
107106
) {
108-
block := blockAndPvtdata.Block
107+
// Among ValidateAndPrepare(), PrepareExpiringKeys(), and
108+
// RemoveStaleAndCommitPvtDataOfOldBlocks(), we can allow only one
109+
// function to execute at a time. The reason is that each function calls
110+
// LoadCommittedVersions() which would clear the existing entries in the
111+
// transient buffer and load new entries (such a transient buffer is not
112+
// applicable for the golevelDB). As a result, these three functions can
113+
// interleave and nullify the optimization provided by the bulk read API.
114+
// Once the ledger cache (FAB-103) is introduced and existing
115+
// LoadCommittedVersions() is refactored to return a map, we can allow
116+
// these three functions to execute parallely.
109117
logger.Debugf("Waiting for purge mgr to finish the background job of computing expirying keys for the block")
110118
txmgr.pvtdataPurgeMgr.WaitForPrepareToFinish()
119+
txmgr.oldBlockCommit.Lock()
120+
defer txmgr.oldBlockCommit.Unlock()
121+
122+
block := blockAndPvtdata.Block
111123
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
112124
batch, txstatsInfo, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
113125
if err != nil {
@@ -128,7 +140,7 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
128140
// (2) acquire a lock on oldBlockCommit
129141
// (3) checks for stale pvtData by comparing [version, valueHash] and removes stale data
130142
// (4) creates update batch from the the non-stale pvtData
131-
// (5) update the BTL bookkeeping managed by the purge manager and prepare expiring keys.
143+
// (5) update the BTL bookkeeping managed by the purge manager and update expiring keys.
132144
// (6) commit the non-stale pvt data to the stateDB
133145
// This function assumes that the passed input contains only transactions that had been
134146
// marked "Valid". In the current design, this assumption holds true as we store
@@ -143,6 +155,22 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
143155
// function might receive pvtData of both valid and invalid tx. Such a scenario is explained
144156
// in FAB-12924 and is related to state fork and rebuilding ledger state.
145157
func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
158+
// (0) Among ValidateAndPrepare(), PrepareExpiringKeys(), and
159+
// RemoveStaleAndCommitPvtDataOfOldBlocks(), we can allow only one
160+
// function to execute at a time. The reason is that each function calls
161+
// LoadCommittedVersions() which would clear the existing entries in the
162+
// transient buffer and load new entries (such a transient buffer is not
163+
// applicable for the golevelDB). As a result, these three functions can
164+
// interleave and nullify the optimization provided by the bulk read API.
165+
// Once the ledger cache (FAB-103) is introduced and existing
166+
// LoadCommittedVersions() is refactored to return a map, we can allow
167+
// these three functions to execute parallely. However, we cannot remove
168+
// the lock on oldBlockCommit as it is also used to avoid interleaving
169+
// between Commit() and execution of this function for the correctness.
170+
txmgr.pvtdataPurgeMgr.WaitForPrepareToFinish()
171+
txmgr.oldBlockCommit.Lock()
172+
defer txmgr.oldBlockCommit.Unlock()
173+
146174
// (1) as the blocksPvtData can contain multiple versions of pvtData for
147175
// a given <ns, coll, key>, we need to find duplicate tuples with different
148176
// versions and use the one with the higher version
@@ -151,14 +179,6 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
151179
return err
152180
}
153181

154-
// (2) acquire a lock on oldBlockCommit. If the regular block commit has already
155-
// acquired this lock, commit of old blocks' pvtData cannot proceed until the lock
156-
// is released. This is required as the PrepareForExpiringKeys() used in step (5)
157-
// of this function might affect the result of DeleteExpiredAndUpdateBookkeeping()
158-
// in Commit()
159-
txmgr.oldBlockCommit.Lock()
160-
defer txmgr.oldBlockCommit.Unlock()
161-
162182
// (3) remove the pvt data which does not matches the hashed
163183
// value stored in the public state
164184
if err := uniquePvtData.findAndRemoveStalePvtData(txmgr.db); err != nil {
@@ -168,19 +188,16 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
168188
// (4) create the update batch from the uniquePvtData
169189
batch := uniquePvtData.transformToUpdateBatch()
170190

171-
// (5) update booking in the purge manager and prepare expiring keys.
172-
// Though the expiring keys would have been loaded in memory during last
173-
// PrepareExpiringKeys from Commit but we rerun this here because,
174-
// RemoveStaleAndCommitPvtDataOfOldBlocks may have added new data which might be
175-
// eligible for expiry during the next regular block commit.
191+
// (5) update booking in the purge manager and update toPurgeList
192+
// (i.e., the list of expiry keys). As the expiring keys would have
193+
// been constructed during last PrepareExpiringKeys from commit, we need
194+
// to update the list. This is because RemoveStaleAndCommitPvtDataOfOldBlocks
195+
// may have added new data which might be eligible for expiry during the
196+
// next regular block commit.
176197
if err := txmgr.pvtdataPurgeMgr.UpdateBookkeepingForPvtDataOfOldBlocks(batch.PvtUpdates); err != nil {
177198
return err
178199
}
179200

180-
if txmgr.lastCommittedBlockNum > 0 {
181-
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(txmgr.lastCommittedBlockNum + 1)
182-
}
183-
184201
// (6) commit the pvt data to the stateDB
185202
if err := txmgr.db.ApplyPrivacyAwareUpdates(batch, nil); err != nil {
186203
return err
@@ -284,10 +301,6 @@ func (uniquePvtData uniquePvtDataMap) findAndRemoveStalePvtData(db privacyenable
284301
}
285302

286303
func (uniquePvtData uniquePvtDataMap) loadCommittedVersionIntoCache(db privacyenabledstate.DB) error {
287-
// the regular block validation might have populate the cache already. In that scenario,
288-
// this call would be adding more entries to the existing cache. However, it does not affect
289-
// the correctness of regular block validation. If an entry already exist for a given
290-
// hashedCompositeKey, cache would not be updated.
291304
// Note that ClearCachedVersions would not be called till we validate and commit these
292305
// pvt data of old blocks. This is because only during the exclusive lock duration, we
293306
// clear the cache and we have already acquired one before reaching here.
@@ -418,13 +431,13 @@ func (txmgr *LockBasedTxMgr) Shutdown() {
418431

419432
// Commit implements method in interface `txmgmt.TxMgr`
420433
func (txmgr *LockBasedTxMgr) Commit() error {
421-
// we need to acquire a lock on oldBlockCommit. This is required because
422-
// the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
423-
// PrepareForExpiringKeys() in RemoveStaleAndCommitPvtDataOfOldBlocks() is allowed to
424-
// execute parallely. RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
425-
// batch based on the current state and if we allow regular block commits at the
426-
// same time, the former may overwrite the newer versions of the data and we may
427-
// end up with an incorrect update batch.
434+
// we need to acquire a lock on oldBlockCommit. The following are the two reasons:
435+
// (1) the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
436+
// toPurgeList is updated by RemoveStaleAndCommitPvtDataOfOldBlocks().
437+
// (2) RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
438+
// batch based on the current state and if we allow regular block commits at the
439+
// same time, the former may overwrite the newer versions of the data and we may
440+
// end up with an incorrect update batch.
428441
txmgr.oldBlockCommit.Lock()
429442
defer txmgr.oldBlockCommit.Unlock()
430443

@@ -472,7 +485,6 @@ func (txmgr *LockBasedTxMgr) Commit() error {
472485
// In the case of error state listeners will not recieve this call - instead a peer panic is caused by the ledger upon receiveing
473486
// an error from this function
474487
txmgr.updateStateListeners()
475-
txmgr.lastCommittedBlockNum = txmgr.current.blockNum()
476488
return nil
477489
}
478490

0 commit comments

Comments
 (0)