@@ -32,16 +32,15 @@ var logger = flogging.MustGetLogger("lockbasedtxmgr")
32
32
// LockBasedTxMgr a simple implementation of interface `txmgmt.TxMgr`.
33
33
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
34
34
type LockBasedTxMgr struct {
35
- ledgerid string
36
- db privacyenabledstate.DB
37
- pvtdataPurgeMgr * pvtdataPurgeMgr
38
- validator validator.Validator
39
- stateListeners []ledger.StateListener
40
- ccInfoProvider ledger.DeployedChaincodeInfoProvider
41
- commitRWLock sync.RWMutex
42
- oldBlockCommit sync.Mutex
43
- current * current
44
- lastCommittedBlockNum uint64
35
+ ledgerid string
36
+ db privacyenabledstate.DB
37
+ pvtdataPurgeMgr * pvtdataPurgeMgr
38
+ validator validator.Validator
39
+ stateListeners []ledger.StateListener
40
+ ccInfoProvider ledger.DeployedChaincodeInfoProvider
41
+ commitRWLock sync.RWMutex
42
+ oldBlockCommit sync.Mutex
43
+ current * current
45
44
}
46
45
47
46
type current struct {
@@ -105,9 +104,22 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, er
105
104
func (txmgr * LockBasedTxMgr ) ValidateAndPrepare (blockAndPvtdata * ledger.BlockAndPvtData , doMVCCValidation bool ) (
106
105
[]* txmgr.TxStatInfo , error ,
107
106
) {
108
- block := blockAndPvtdata .Block
107
+ // Among ValidateAndPrepare(), PrepareExpiringKeys(), and
108
+ // RemoveStaleAndCommitPvtDataOfOldBlocks(), we can allow only one
109
+ // function to execute at a time. The reason is that each function calls
110
+ // LoadCommittedVersions() which would clear the existing entries in the
111
+ // transient buffer and load new entries (such a transient buffer is not
112
+ // applicable for the golevelDB). As a result, these three functions can
113
+ // interleave and nullify the optimization provided by the bulk read API.
114
+ // Once the ledger cache (FAB-103) is introduced and existing
115
+ // LoadCommittedVersions() is refactored to return a map, we can allow
116
+ // these three functions to execute parallely.
109
117
logger .Debugf ("Waiting for purge mgr to finish the background job of computing expirying keys for the block" )
110
118
txmgr .pvtdataPurgeMgr .WaitForPrepareToFinish ()
119
+ txmgr .oldBlockCommit .Lock ()
120
+ defer txmgr .oldBlockCommit .Unlock ()
121
+
122
+ block := blockAndPvtdata .Block
111
123
logger .Debugf ("Validating new block with num trans = [%d]" , len (block .Data .Data ))
112
124
batch , txstatsInfo , err := txmgr .validator .ValidateAndPrepareBatch (blockAndPvtdata , doMVCCValidation )
113
125
if err != nil {
@@ -128,7 +140,7 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
128
140
// (2) acquire a lock on oldBlockCommit
129
141
// (3) checks for stale pvtData by comparing [version, valueHash] and removes stale data
130
142
// (4) creates update batch from the the non-stale pvtData
131
- // (5) update the BTL bookkeeping managed by the purge manager and prepare expiring keys.
143
+ // (5) update the BTL bookkeeping managed by the purge manager and update expiring keys.
132
144
// (6) commit the non-stale pvt data to the stateDB
133
145
// This function assumes that the passed input contains only transactions that had been
134
146
// marked "Valid". In the current design, this assumption holds true as we store
@@ -143,6 +155,22 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
143
155
// function might receive pvtData of both valid and invalid tx. Such a scenario is explained
144
156
// in FAB-12924 and is related to state fork and rebuilding ledger state.
145
157
func (txmgr * LockBasedTxMgr ) RemoveStaleAndCommitPvtDataOfOldBlocks (blocksPvtData map [uint64 ][]* ledger.TxPvtData ) error {
158
+ // (0) Among ValidateAndPrepare(), PrepareExpiringKeys(), and
159
+ // RemoveStaleAndCommitPvtDataOfOldBlocks(), we can allow only one
160
+ // function to execute at a time. The reason is that each function calls
161
+ // LoadCommittedVersions() which would clear the existing entries in the
162
+ // transient buffer and load new entries (such a transient buffer is not
163
+ // applicable for the golevelDB). As a result, these three functions can
164
+ // interleave and nullify the optimization provided by the bulk read API.
165
+ // Once the ledger cache (FAB-103) is introduced and existing
166
+ // LoadCommittedVersions() is refactored to return a map, we can allow
167
+ // these three functions to execute parallely. However, we cannot remove
168
+ // the lock on oldBlockCommit as it is also used to avoid interleaving
169
+ // between Commit() and execution of this function for the correctness.
170
+ txmgr .pvtdataPurgeMgr .WaitForPrepareToFinish ()
171
+ txmgr .oldBlockCommit .Lock ()
172
+ defer txmgr .oldBlockCommit .Unlock ()
173
+
146
174
// (1) as the blocksPvtData can contain multiple versions of pvtData for
147
175
// a given <ns, coll, key>, we need to find duplicate tuples with different
148
176
// versions and use the one with the higher version
@@ -151,14 +179,6 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
151
179
return err
152
180
}
153
181
154
- // (2) acquire a lock on oldBlockCommit. If the regular block commit has already
155
- // acquired this lock, commit of old blocks' pvtData cannot proceed until the lock
156
- // is released. This is required as the PrepareForExpiringKeys() used in step (5)
157
- // of this function might affect the result of DeleteExpiredAndUpdateBookkeeping()
158
- // in Commit()
159
- txmgr .oldBlockCommit .Lock ()
160
- defer txmgr .oldBlockCommit .Unlock ()
161
-
162
182
// (3) remove the pvt data which does not matches the hashed
163
183
// value stored in the public state
164
184
if err := uniquePvtData .findAndRemoveStalePvtData (txmgr .db ); err != nil {
@@ -168,19 +188,16 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
168
188
// (4) create the update batch from the uniquePvtData
169
189
batch := uniquePvtData .transformToUpdateBatch ()
170
190
171
- // (5) update booking in the purge manager and prepare expiring keys.
172
- // Though the expiring keys would have been loaded in memory during last
173
- // PrepareExpiringKeys from Commit but we rerun this here because,
174
- // RemoveStaleAndCommitPvtDataOfOldBlocks may have added new data which might be
175
- // eligible for expiry during the next regular block commit.
191
+ // (5) update booking in the purge manager and update toPurgeList
192
+ // (i.e., the list of expiry keys). As the expiring keys would have
193
+ // been constructed during last PrepareExpiringKeys from commit, we need
194
+ // to update the list. This is because RemoveStaleAndCommitPvtDataOfOldBlocks
195
+ // may have added new data which might be eligible for expiry during the
196
+ // next regular block commit.
176
197
if err := txmgr .pvtdataPurgeMgr .UpdateBookkeepingForPvtDataOfOldBlocks (batch .PvtUpdates ); err != nil {
177
198
return err
178
199
}
179
200
180
- if txmgr .lastCommittedBlockNum > 0 {
181
- txmgr .pvtdataPurgeMgr .PrepareForExpiringKeys (txmgr .lastCommittedBlockNum + 1 )
182
- }
183
-
184
201
// (6) commit the pvt data to the stateDB
185
202
if err := txmgr .db .ApplyPrivacyAwareUpdates (batch , nil ); err != nil {
186
203
return err
@@ -284,10 +301,6 @@ func (uniquePvtData uniquePvtDataMap) findAndRemoveStalePvtData(db privacyenable
284
301
}
285
302
286
303
func (uniquePvtData uniquePvtDataMap ) loadCommittedVersionIntoCache (db privacyenabledstate.DB ) error {
287
- // the regular block validation might have populate the cache already. In that scenario,
288
- // this call would be adding more entries to the existing cache. However, it does not affect
289
- // the correctness of regular block validation. If an entry already exist for a given
290
- // hashedCompositeKey, cache would not be updated.
291
304
// Note that ClearCachedVersions would not be called till we validate and commit these
292
305
// pvt data of old blocks. This is because only during the exclusive lock duration, we
293
306
// clear the cache and we have already acquired one before reaching here.
@@ -418,13 +431,13 @@ func (txmgr *LockBasedTxMgr) Shutdown() {
418
431
419
432
// Commit implements method in interface `txmgmt.TxMgr`
420
433
func (txmgr * LockBasedTxMgr ) Commit () error {
421
- // we need to acquire a lock on oldBlockCommit. This is required because
422
- // the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
423
- // PrepareForExpiringKeys() in RemoveStaleAndCommitPvtDataOfOldBlocks() is allowed to
424
- // execute parallely. RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
425
- // batch based on the current state and if we allow regular block commits at the
426
- // same time, the former may overwrite the newer versions of the data and we may
427
- // end up with an incorrect update batch.
434
+ // we need to acquire a lock on oldBlockCommit. The following are the two reasons:
435
+ // (1) the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
436
+ // toPurgeList is updated by RemoveStaleAndCommitPvtDataOfOldBlocks().
437
+ // (2) RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
438
+ // batch based on the current state and if we allow regular block commits at the
439
+ // same time, the former may overwrite the newer versions of the data and we may
440
+ // end up with an incorrect update batch.
428
441
txmgr .oldBlockCommit .Lock ()
429
442
defer txmgr .oldBlockCommit .Unlock ()
430
443
@@ -472,7 +485,6 @@ func (txmgr *LockBasedTxMgr) Commit() error {
472
485
// In the case of error state listeners will not recieve this call - instead a peer panic is caused by the ledger upon receiveing
473
486
// an error from this function
474
487
txmgr .updateStateListeners ()
475
- txmgr .lastCommittedBlockNum = txmgr .current .blockNum ()
476
488
return nil
477
489
}
478
490
0 commit comments