@@ -11,6 +11,7 @@ import (
11
11
12
12
"github.com/golang/protobuf/proto"
13
13
"github.com/hyperledger/fabric/protos/ledger/rwset"
14
+ "github.com/hyperledger/fabric/protos/transientstore"
14
15
15
16
"github.com/hyperledger/fabric/common/flogging"
16
17
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
@@ -23,6 +24,7 @@ import (
23
24
var logger = flogging .MustGetLogger ("transientstore" )
24
25
25
26
var emptyValue = []byte {}
27
+ var nilByte = byte ('\x00' )
26
28
27
29
// ErrStoreEmpty is used to indicate that there are no entries in transient store
28
30
var ErrStoreEmpty = errors .New ("Transient store is empty" )
@@ -43,6 +45,12 @@ type RWSetScanner interface {
43
45
// It may return nil, nil when it has no further data, and also may return an error
44
46
// on failure
45
47
Next () (* EndorserPvtSimulationResults , error )
48
+ // NextWithConfig returns the next EndorserPvtSimulationResultsWithConfig from the RWSetScanner.
49
+ // It may return nil, nil when it has no further data, and also may return an error
50
+ // on failure
51
+ // TODO: Once the related gossip changes are made as per FAB-5096, remove the above function
52
+ // and rename the below function to Next form NextWithConfig.
53
+ NextWithConfig () (* EndorserPvtSimulationResultsWithConfig , error )
46
54
// Close frees the resources associated with this RWSetScanner
47
55
Close ()
48
56
}
@@ -54,6 +62,11 @@ type Store interface {
54
62
// Persist stores the private write set of a transaction in the transient store
55
63
// based on txid and the block height the private data was received at
56
64
Persist (txid string , blockHeight uint64 , privateSimulationResults * rwset.TxPvtReadWriteSet ) error
65
+ // TODO: Once the related gossip changes are made as per FAB-5096, remove the above function
66
+ // and rename the below function to Persist form PersistWithConfig.
67
+ // PersistWithConfig stores the private write set of a transaction along with the collection config
68
+ // in the transient store based on txid and the block height the private data was received at
69
+ PersistWithConfig (txid string , blockHeight uint64 , privateSimulationResultsWithConfig * transientstore.TxPvtReadWriteSetWithConfigInfo ) error
57
70
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
58
71
// write sets persisted from different endorsers (via Gossip)
59
72
GetTxPvtRWSetByTxid (txid string , filter ledger.PvtNsCollFilter ) (RWSetScanner , error )
@@ -73,11 +86,18 @@ type Store interface {
73
86
}
74
87
75
88
// EndorserPvtSimulationResults captures the details of the simulation results specific to an endorser
89
+ // TODO: Once the related gossip changes are made as per FAB-5096, remove this struct
76
90
type EndorserPvtSimulationResults struct {
77
91
ReceivedAtBlockHeight uint64
78
92
PvtSimulationResults * rwset.TxPvtReadWriteSet
79
93
}
80
94
95
+ // EndorserPvtSimulationResultsWithConfig captures the details of the simulation results specific to an endorser
96
+ type EndorserPvtSimulationResultsWithConfig struct {
97
+ ReceivedAtBlockHeight uint64
98
+ PvtSimulationResultsWithConfig * transientstore.TxPvtReadWriteSetWithConfigInfo
99
+ }
100
+
81
101
//////////////////////////////////////////////
82
102
// Implementation
83
103
/////////////////////////////////////////////
@@ -120,6 +140,7 @@ func (provider *storeProvider) Close() {
120
140
121
141
// Persist stores the private write set of a transaction in the transient store
122
142
// based on txid and the block height the private data was received at
143
+ // TODO: Once the related gossip changes are made as per FAB-5096, remove this function.
123
144
func (s * store ) Persist (txid string , blockHeight uint64 ,
124
145
privateSimulationResults * rwset.TxPvtReadWriteSet ) error {
125
146
@@ -141,17 +162,72 @@ func (s *store) Persist(txid string, blockHeight uint64,
141
162
// Create two index: (i) by txid, and (ii) by height
142
163
143
164
// Create compositeKey for purge index by height with appropriate prefix, blockHeight,
144
- // txid, uuid and store the compositeKey (purge index) with a null byte as value. Note that
165
+ // txid, uuid and store the compositeKey (purge index) with a nil byte as value. Note that
145
166
// the purge index is used to remove orphan entries in the transient store (which are not removed
146
167
// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
147
168
// that gets endorsed but not submitted by the client for commit)
148
169
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight (blockHeight , txid , uuid )
149
170
dbBatch .Put (compositeKeyPurgeIndexByHeight , emptyValue )
150
171
151
172
// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
152
- // blockHeight and store the compositeKey (purge index) with a null byte as value.
173
+ // blockHeight and store the compositeKey (purge index) with a nil byte as value.
153
174
// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
154
- // we create a separate composite key with a null byte as value. The reason is that
175
+ // we create a separate composite key with a nil byte as value. The reason is that
176
+ // if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
177
+ // set associated with the key from db. Note that this purge index is used to remove non-orphan
178
+ // entries in the transient store and is used by PurgeTxids()
179
+ // Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet
180
+ // with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a
181
+ // createCompositeKeyForPurgeIndexByTxid() instead.
182
+ compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid (txid , uuid , blockHeight )
183
+ dbBatch .Put (compositeKeyPurgeIndexByTxid , emptyValue )
184
+
185
+ return s .db .WriteBatch (dbBatch , true )
186
+ }
187
+
188
+ // PersistWithConfig stores the private write set of a transaction along with the collection config
189
+ // in the transient store based on txid and the block height the private data was received at
190
+ // TODO: Once the related gossip changes are made as per FAB-5096, rename this function to Persist
191
+ // form PersistWithConfig.
192
+ func (s * store ) PersistWithConfig (txid string , blockHeight uint64 ,
193
+ privateSimulationResultsWithConfig * transientstore.TxPvtReadWriteSetWithConfigInfo ) error {
194
+
195
+ logger .Debugf ("Persisting private data to transient store for txid = %s" , txid )
196
+
197
+ dbBatch := leveldbhelper .NewUpdateBatch ()
198
+
199
+ // Create compositeKey with appropriate prefix, txid, uuid and blockHeight
200
+ // Due to the fact that the txid may have multiple private write sets persisted from different
201
+ // endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.
202
+ uuid := util .GenerateUUID ()
203
+ compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet (txid , uuid , blockHeight )
204
+ privateSimulationResultsWithConfigBytes , err := proto .Marshal (privateSimulationResultsWithConfig )
205
+ if err != nil {
206
+ return err
207
+ }
208
+
209
+ // Note that some rwset.TxPvtReadWriteSet may exist in the transient store immediately after
210
+ // upgrading the peer to v1.2. In order to differentiate between new proto and old proto while
211
+ // retrieving, a nil byte is prepended to the new proto, i.e., privateSimulationResultsWithConfigBytes,
212
+ // as a marshaled message can never start with a nil byte. In v1.3, we can avoid prepending the
213
+ // nil byte.
214
+ value := append ([]byte {nilByte }, privateSimulationResultsWithConfigBytes ... )
215
+ dbBatch .Put (compositeKeyPvtRWSet , value )
216
+
217
+ // Create two index: (i) by txid, and (ii) by height
218
+
219
+ // Create compositeKey for purge index by height with appropriate prefix, blockHeight,
220
+ // txid, uuid and store the compositeKey (purge index) with a nil byte as value. Note that
221
+ // the purge index is used to remove orphan entries in the transient store (which are not removed
222
+ // by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
223
+ // that gets endorsed but not submitted by the client for commit)
224
+ compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight (blockHeight , txid , uuid )
225
+ dbBatch .Put (compositeKeyPurgeIndexByHeight , emptyValue )
226
+
227
+ // Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
228
+ // blockHeight and store the compositeKey (purge index) with a nil byte as value.
229
+ // Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
230
+ // we create a separate composite key with a nil byte as value. The reason is that
155
231
// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
156
232
// set associated with the key from db. Note that this purge index is used to remove non-orphan
157
233
// entries in the transient store and is used by PurgeTxids()
@@ -288,6 +364,7 @@ func (s *store) Shutdown() {
288
364
289
365
// Next moves the iterator to the next key/value pair.
290
366
// It returns whether the iterator is exhausted.
367
+ // TODO: Once the related gossip changes are made as per FAB-5096, remove this function
291
368
func (scanner * RwsetScanner ) Next () (* EndorserPvtSimulationResults , error ) {
292
369
if ! scanner .dbItr .Next () {
293
370
return nil , nil
@@ -308,6 +385,43 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
308
385
}, nil
309
386
}
310
387
388
+ // Next moves the iterator to the next key/value pair.
389
+ // It returns whether the iterator is exhausted.
390
+ // TODO: Once the related gossip changes are made as per FAB-5096, rename this function to Next
391
+ func (scanner * RwsetScanner ) NextWithConfig () (* EndorserPvtSimulationResultsWithConfig , error ) {
392
+ if ! scanner .dbItr .Next () {
393
+ return nil , nil
394
+ }
395
+ dbKey := scanner .dbItr .Key ()
396
+ dbVal := scanner .dbItr .Value ()
397
+ _ , blockHeight := splitCompositeKeyOfPvtRWSet (dbKey )
398
+
399
+ txPvtRWSet := & rwset.TxPvtReadWriteSet {}
400
+ filteredTxPvtRWSet := & rwset.TxPvtReadWriteSet {}
401
+ txPvtRWSetWithConfig := & transientstore.TxPvtReadWriteSetWithConfigInfo {}
402
+
403
+ if dbVal [0 ] == nilByte {
404
+ // new proto, i.e., TxPvtReadWriteSetWithConfigInfo
405
+ if err := proto .Unmarshal (dbVal [1 :], txPvtRWSetWithConfig ); err != nil {
406
+ return nil , err
407
+ }
408
+ filteredTxPvtRWSet = pvtdatastorage .TrimPvtWSet (txPvtRWSetWithConfig .GetPvtRwset (), scanner .filter )
409
+ } else {
410
+ // old proto, i.e., TxPvtReadWriteSet
411
+ if err := proto .Unmarshal (dbVal , txPvtRWSet ); err != nil {
412
+ return nil , err
413
+ }
414
+ filteredTxPvtRWSet = pvtdatastorage .TrimPvtWSet (txPvtRWSet , scanner .filter )
415
+ }
416
+
417
+ txPvtRWSetWithConfig .PvtRwset = filteredTxPvtRWSet
418
+
419
+ return & EndorserPvtSimulationResultsWithConfig {
420
+ ReceivedAtBlockHeight : blockHeight ,
421
+ PvtSimulationResultsWithConfig : txPvtRWSetWithConfig ,
422
+ }, nil
423
+ }
424
+
311
425
// Close releases resource held by the iterator
312
426
func (scanner * RwsetScanner ) Close () {
313
427
scanner .dbItr .Release ()
0 commit comments