Skip to content

Commit f1e0e36

Browse files
committed
[FAB-9774] transient store with the new proto msg
FAB-9204 adds a new proto for transient store entry. This CR makes introduces two new APIs -- PersistWithConfig() and NextWithConfig() to store and retrieve the new proto message, respectively. Change-Id: I5487002d133d322bbd7fbc98bf219792fe871775 Signed-off-by: senthil <cendhu@gmail.com>
1 parent 9015790 commit f1e0e36

File tree

4 files changed

+310
-80
lines changed

4 files changed

+310
-80
lines changed

core/transientstore/store.go

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/golang/protobuf/proto"
1313
"github.com/hyperledger/fabric/protos/ledger/rwset"
14+
"github.com/hyperledger/fabric/protos/transientstore"
1415

1516
"github.com/hyperledger/fabric/common/flogging"
1617
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
@@ -23,6 +24,7 @@ import (
2324
var logger = flogging.MustGetLogger("transientstore")
2425

2526
var emptyValue = []byte{}
27+
var nilByte = byte('\x00')
2628

2729
// ErrStoreEmpty is used to indicate that there are no entries in transient store
2830
var ErrStoreEmpty = errors.New("Transient store is empty")
@@ -43,6 +45,12 @@ type RWSetScanner interface {
4345
// It may return nil, nil when it has no further data, and also may return an error
4446
// on failure
4547
Next() (*EndorserPvtSimulationResults, error)
48+
// NextWithConfig returns the next EndorserPvtSimulationResultsWithConfig from the RWSetScanner.
49+
// It may return nil, nil when it has no further data, and also may return an error
50+
// on failure
51+
// TODO: Once the related gossip changes are made as per FAB-5096, remove the above function
52+
// and rename the below function to Next form NextWithConfig.
53+
NextWithConfig() (*EndorserPvtSimulationResultsWithConfig, error)
4654
// Close frees the resources associated with this RWSetScanner
4755
Close()
4856
}
@@ -54,6 +62,11 @@ type Store interface {
5462
// Persist stores the private write set of a transaction in the transient store
5563
// based on txid and the block height the private data was received at
5664
Persist(txid string, blockHeight uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
65+
// TODO: Once the related gossip changes are made as per FAB-5096, remove the above function
66+
// and rename the below function to Persist form PersistWithConfig.
67+
// PersistWithConfig stores the private write set of a transaction along with the collection config
68+
// in the transient store based on txid and the block height the private data was received at
69+
PersistWithConfig(txid string, blockHeight uint64, privateSimulationResultsWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo) error
5770
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
5871
// write sets persisted from different endorsers (via Gossip)
5972
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error)
@@ -73,11 +86,18 @@ type Store interface {
7386
}
7487

7588
// EndorserPvtSimulationResults captures the details of the simulation results specific to an endorser
89+
// TODO: Once the related gossip changes are made as per FAB-5096, remove this struct
7690
type EndorserPvtSimulationResults struct {
7791
ReceivedAtBlockHeight uint64
7892
PvtSimulationResults *rwset.TxPvtReadWriteSet
7993
}
8094

95+
// EndorserPvtSimulationResultsWithConfig captures the details of the simulation results specific to an endorser
96+
type EndorserPvtSimulationResultsWithConfig struct {
97+
ReceivedAtBlockHeight uint64
98+
PvtSimulationResultsWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo
99+
}
100+
81101
//////////////////////////////////////////////
82102
// Implementation
83103
/////////////////////////////////////////////
@@ -120,6 +140,7 @@ func (provider *storeProvider) Close() {
120140

121141
// Persist stores the private write set of a transaction in the transient store
122142
// based on txid and the block height the private data was received at
143+
// TODO: Once the related gossip changes are made as per FAB-5096, remove this function.
123144
func (s *store) Persist(txid string, blockHeight uint64,
124145
privateSimulationResults *rwset.TxPvtReadWriteSet) error {
125146

@@ -141,17 +162,72 @@ func (s *store) Persist(txid string, blockHeight uint64,
141162
// Create two index: (i) by txid, and (ii) by height
142163

143164
// Create compositeKey for purge index by height with appropriate prefix, blockHeight,
144-
// txid, uuid and store the compositeKey (purge index) with a null byte as value. Note that
165+
// txid, uuid and store the compositeKey (purge index) with a nil byte as value. Note that
145166
// the purge index is used to remove orphan entries in the transient store (which are not removed
146167
// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
147168
// that gets endorsed but not submitted by the client for commit)
148169
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)
149170
dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue)
150171

151172
// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
152-
// blockHeight and store the compositeKey (purge index) with a null byte as value.
173+
// blockHeight and store the compositeKey (purge index) with a nil byte as value.
153174
// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
154-
// we create a separate composite key with a null byte as value. The reason is that
175+
// we create a separate composite key with a nil byte as value. The reason is that
176+
// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
177+
// set associated with the key from db. Note that this purge index is used to remove non-orphan
178+
// entries in the transient store and is used by PurgeTxids()
179+
// Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet
180+
// with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a
181+
// createCompositeKeyForPurgeIndexByTxid() instead.
182+
compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, blockHeight)
183+
dbBatch.Put(compositeKeyPurgeIndexByTxid, emptyValue)
184+
185+
return s.db.WriteBatch(dbBatch, true)
186+
}
187+
188+
// PersistWithConfig stores the private write set of a transaction along with the collection config
189+
// in the transient store based on txid and the block height the private data was received at
190+
// TODO: Once the related gossip changes are made as per FAB-5096, rename this function to Persist
191+
// form PersistWithConfig.
192+
func (s *store) PersistWithConfig(txid string, blockHeight uint64,
193+
privateSimulationResultsWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo) error {
194+
195+
logger.Debugf("Persisting private data to transient store for txid = %s", txid)
196+
197+
dbBatch := leveldbhelper.NewUpdateBatch()
198+
199+
// Create compositeKey with appropriate prefix, txid, uuid and blockHeight
200+
// Due to the fact that the txid may have multiple private write sets persisted from different
201+
// endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.
202+
uuid := util.GenerateUUID()
203+
compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)
204+
privateSimulationResultsWithConfigBytes, err := proto.Marshal(privateSimulationResultsWithConfig)
205+
if err != nil {
206+
return err
207+
}
208+
209+
// Note that some rwset.TxPvtReadWriteSet may exist in the transient store immediately after
210+
// upgrading the peer to v1.2. In order to differentiate between new proto and old proto while
211+
// retrieving, a nil byte is prepended to the new proto, i.e., privateSimulationResultsWithConfigBytes,
212+
// as a marshaled message can never start with a nil byte. In v1.3, we can avoid prepending the
213+
// nil byte.
214+
value := append([]byte{nilByte}, privateSimulationResultsWithConfigBytes...)
215+
dbBatch.Put(compositeKeyPvtRWSet, value)
216+
217+
// Create two index: (i) by txid, and (ii) by height
218+
219+
// Create compositeKey for purge index by height with appropriate prefix, blockHeight,
220+
// txid, uuid and store the compositeKey (purge index) with a nil byte as value. Note that
221+
// the purge index is used to remove orphan entries in the transient store (which are not removed
222+
// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction
223+
// that gets endorsed but not submitted by the client for commit)
224+
compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)
225+
dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue)
226+
227+
// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,
228+
// blockHeight and store the compositeKey (purge index) with a nil byte as value.
229+
// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,
230+
// we create a separate composite key with a nil byte as value. The reason is that
155231
// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write
156232
// set associated with the key from db. Note that this purge index is used to remove non-orphan
157233
// entries in the transient store and is used by PurgeTxids()
@@ -288,6 +364,7 @@ func (s *store) Shutdown() {
288364

289365
// Next moves the iterator to the next key/value pair.
290366
// It returns whether the iterator is exhausted.
367+
// TODO: Once the related gossip changes are made as per FAB-5096, remove this function
291368
func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
292369
if !scanner.dbItr.Next() {
293370
return nil, nil
@@ -308,6 +385,43 @@ func (scanner *RwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
308385
}, nil
309386
}
310387

388+
// Next moves the iterator to the next key/value pair.
389+
// It returns whether the iterator is exhausted.
390+
// TODO: Once the related gossip changes are made as per FAB-5096, rename this function to Next
391+
func (scanner *RwsetScanner) NextWithConfig() (*EndorserPvtSimulationResultsWithConfig, error) {
392+
if !scanner.dbItr.Next() {
393+
return nil, nil
394+
}
395+
dbKey := scanner.dbItr.Key()
396+
dbVal := scanner.dbItr.Value()
397+
_, blockHeight := splitCompositeKeyOfPvtRWSet(dbKey)
398+
399+
txPvtRWSet := &rwset.TxPvtReadWriteSet{}
400+
filteredTxPvtRWSet := &rwset.TxPvtReadWriteSet{}
401+
txPvtRWSetWithConfig := &transientstore.TxPvtReadWriteSetWithConfigInfo{}
402+
403+
if dbVal[0] == nilByte {
404+
// new proto, i.e., TxPvtReadWriteSetWithConfigInfo
405+
if err := proto.Unmarshal(dbVal[1:], txPvtRWSetWithConfig); err != nil {
406+
return nil, err
407+
}
408+
filteredTxPvtRWSet = pvtdatastorage.TrimPvtWSet(txPvtRWSetWithConfig.GetPvtRwset(), scanner.filter)
409+
} else {
410+
// old proto, i.e., TxPvtReadWriteSet
411+
if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
412+
return nil, err
413+
}
414+
filteredTxPvtRWSet = pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter)
415+
}
416+
417+
txPvtRWSetWithConfig.PvtRwset = filteredTxPvtRWSet
418+
419+
return &EndorserPvtSimulationResultsWithConfig{
420+
ReceivedAtBlockHeight: blockHeight,
421+
PvtSimulationResultsWithConfig: txPvtRWSetWithConfig,
422+
}, nil
423+
}
424+
311425
// Close releases resource held by the iterator
312426
func (scanner *RwsetScanner) Close() {
313427
scanner.dbItr.Release()

0 commit comments

Comments
 (0)