Skip to content

Commit 56f3c90

Browse files
committed
[FAB-4975] SideDB: TransientStore for pvt writeset
This CR adds a transientstore to peer for temporarily holding the private write set of a transaction. Tranisent store will be used by endorser, committer, and gossip middleware. Currently, transientstore exposes following five APIs. 1. Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) 2. GetTxPvtRWSetByTxid(txid string) 3. GetSelfSimulatedTxPvtRWSetByTxid(txid string) 4. Purge(maxBlockNumToRetain uint64) 5. GetMinEndorsementBlkHt() Endorser: After simulating a transaction, endorsers will collect transaction simulation results (public rwset, hashed rwset, and private wset) from ledger and store the private write set in transientstore using Persist(). Gossip: When gossip receives a private write set from other peers, it will store the write set in transientstore using Persist(). When gossip receives a request from other peers for a private write set associated with a txid, it will retrieve the write set from the transientStore (using GetSelfSimulatedTxPvtRWSetByTxid()/GetTxPvtRWSetByTxid()) Committer: When a committer receives a block from gossip for commit, after performing VSCC validation, committer will fetch relevant private write sets from the transientStore (using GetSelfSimulatedTxPvtRWSet()/ GetTxPrivateRWSetByTxid()) and pass it to ledger along with the block. In the subsequent CRs, transientstore will be integrated with endorser and committer components. Further, endorserid needs to be defined. Change-Id: I957120af6c091c4d5c45c9e06b29025a2481bf49 Signed-off-by: senthil <cendhu@gmail.com>
1 parent a4b4107 commit 56f3c90

File tree

4 files changed

+676
-0
lines changed

4 files changed

+676
-0
lines changed

core/transientstore/store.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package transientstore
18+
19+
import (
20+
"errors"
21+
22+
commonledger "github.com/hyperledger/fabric/common/ledger"
23+
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
24+
"github.com/syndtr/goleveldb/leveldb/iterator"
25+
)
26+
27+
var emptyValue = []byte{}
28+
29+
// ErrStoreEmpty is used to indicate that there are no entries in transient store
30+
var ErrStoreEmpty = errors.New("Transient store is empty")
31+
32+
//////////////////////////////////////////////
33+
// Interfaces and data types
34+
/////////////////////////////////////////////
35+
36+
// StoreProvider provides an instance of a TransientStore
37+
type StoreProvider interface {
38+
OpenStore(ledgerID string) (Store, error)
39+
Close()
40+
}
41+
42+
// Store manages the storage of private read-write sets for a ledgerId.
43+
// Ideally, a ledger can remove the data from this storage when it is committed to
44+
// the permanent storage or the pruning of some data items is enforced by the policy
45+
type Store interface {
46+
// Persist stores the private read-write set of a transaction in the transient store
47+
Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error
48+
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
49+
// RWSets persisted from different endorsers (via Gossip)
50+
GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error)
51+
// GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation
52+
// performed by the peer itself
53+
GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error)
54+
// Purge removes private read-writes set generated by endorsers at block height lesser than
55+
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
56+
// that were generated at block height of maxBlockNumToRetain or higher.
57+
Purge(maxBlockNumToRetain uint64) error
58+
// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
59+
GetMinEndorsementBlkHt() (uint64, error)
60+
Shutdown()
61+
}
62+
63+
// EndorserPvtSimulationResults captures the deatils of the simulation results specific to an endorser
64+
type EndorserPvtSimulationResults struct {
65+
EndorserID string
66+
EndorsementBlockHeight uint64
67+
PvtSimulationResults []byte
68+
}
69+
70+
//////////////////////////////////////////////
71+
// Implementation
72+
/////////////////////////////////////////////
73+
74+
// storeProvider encapsulates a leveldb provider which is used to store
75+
// private read-write set of simulated transactions, and implements TransientStoreProvider
76+
// interface.
77+
type storeProvider struct {
78+
dbProvider *leveldbhelper.Provider
79+
}
80+
81+
// store holds an instance of a levelDB.
82+
type store struct {
83+
db *leveldbhelper.DBHandle
84+
ledgerID string
85+
}
86+
87+
type rwsetScanner struct {
88+
txid string
89+
dbItr iterator.Iterator
90+
}
91+
92+
// NewStoreProvider instantiates TransientStoreProvider
93+
func NewStoreProvider() StoreProvider {
94+
dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: getTransientStorePath()})
95+
return &storeProvider{dbProvider: dbProvider}
96+
}
97+
98+
// OpenStore returns a handle to a ledgerId in Store
99+
func (provider *storeProvider) OpenStore(ledgerID string) (Store, error) {
100+
dbHandle := provider.dbProvider.GetDBHandle(ledgerID)
101+
return &store{db: dbHandle, ledgerID: ledgerID}, nil
102+
}
103+
104+
// Close closes the TransientStoreProvider
105+
func (provider *storeProvider) Close() {
106+
provider.dbProvider.Close()
107+
}
108+
109+
// Persist stores the private read-write set of a transaction in the transient store
110+
func (s *store) Persist(txid string, endorserid string,
111+
endorsementBlkHt uint64, privateSimulationResults []byte) error {
112+
dbBatch := leveldbhelper.NewUpdateBatch()
113+
114+
// Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt
115+
compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
116+
dbBatch.Put(compositeKey, privateSimulationResults)
117+
118+
// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store
119+
// the compositeKey (purge index) a null byte as value.
120+
compositeKey = createCompositeKeyForPurgeIndex(endorsementBlkHt, txid, endorserid)
121+
dbBatch.Put(compositeKey, emptyValue)
122+
123+
return s.db.WriteBatch(dbBatch, true)
124+
}
125+
126+
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
127+
// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name
128+
// (filters) along with txid.
129+
func (s *store) GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error) {
130+
// Construct startKey and endKey to do an range query
131+
startKey := createTxidRangeStartKey(txid)
132+
endKey := createTxidRangeEndKey(txid)
133+
134+
iter := s.db.GetIterator(startKey, endKey)
135+
return &rwsetScanner{txid, iter}, nil
136+
}
137+
138+
// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed
139+
// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid.
140+
func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) {
141+
var err error
142+
var itr commonledger.ResultsIterator
143+
if itr, err = s.GetTxPvtRWSetByTxid(txid); err != nil {
144+
return nil, err
145+
}
146+
defer itr.Close()
147+
var temp commonledger.QueryResult
148+
var res *EndorserPvtSimulationResults
149+
for {
150+
if temp, err = itr.Next(); err != nil {
151+
return nil, err
152+
}
153+
if temp == nil {
154+
return nil, nil
155+
}
156+
res = temp.(*EndorserPvtSimulationResults)
157+
if selfSimulated(res) {
158+
return res, nil
159+
}
160+
}
161+
}
162+
163+
// Purge removes private read-writes set generated by endorsers at block height lesser than
164+
// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets
165+
// that were generated at block height of maxBlockNumToRetain or higher.
166+
func (s *store) Purge(maxBlockNumToRetain uint64) error {
167+
// Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKey
168+
startKey := createEndorsementBlkHtRangeStartKey(0)
169+
endKey := createEndorsementBlkHtRangeEndKey(maxBlockNumToRetain - 1)
170+
iter := s.db.GetIterator(startKey, endKey)
171+
172+
dbBatch := leveldbhelper.NewUpdateBatch()
173+
174+
// Get all txid and endorserid from above result and remove it from transient store (both
175+
// read/write set and the corresponding index.
176+
for iter.Next() {
177+
dbKey := iter.Key()
178+
txid, endorserid, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
179+
compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
180+
dbBatch.Delete(compositeKey)
181+
dbBatch.Delete(dbKey)
182+
}
183+
return s.db.WriteBatch(dbBatch, true)
184+
}
185+
186+
// GetMinEndorsementBlkHt returns the lowest retained endorsement block height
187+
func (s *store) GetMinEndorsementBlkHt() (uint64, error) {
188+
// Current approach performs a range query on purgeIndex with startKey
189+
// as 0 (i.e., endorsementBlkHt) and returns the first key which denotes
190+
// the lowest retained endorsement block height. An alternative approach
191+
// is to explicitly store the minEndorsementBlkHt in the transientStore.
192+
startKey := createEndorsementBlkHtRangeStartKey(0)
193+
iter := s.db.GetIterator(startKey, nil)
194+
// Fetch the minimum endorsement block height
195+
if iter.Next() {
196+
dbKey := iter.Key()
197+
_, _, endorsementBlkHt := splitCompositeKeyOfPurgeIndex(dbKey)
198+
return endorsementBlkHt, nil
199+
}
200+
// Returning an error may not be the right thing to do here. May be
201+
// return a bool. -1 is not possible due to unsigned int as first
202+
// return value
203+
return 0, ErrStoreEmpty
204+
}
205+
206+
func (s *store) Shutdown() {
207+
// do nothing because shared db is used
208+
}
209+
210+
// Next moves the iterator to the next key/value pair.
211+
// It returns whether the iterator is exhausted.
212+
func (scanner *rwsetScanner) Next() (commonledger.QueryResult, error) {
213+
if !scanner.dbItr.Next() {
214+
return nil, nil
215+
}
216+
dbKey := scanner.dbItr.Key()
217+
dbVal := scanner.dbItr.Value()
218+
endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)
219+
return &EndorserPvtSimulationResults{
220+
EndorserID: endorserid,
221+
EndorsementBlockHeight: endorsementBlkHt,
222+
PvtSimulationResults: dbVal,
223+
}, nil
224+
}
225+
226+
// Close releases resource held by the iterator
227+
func (scanner *rwsetScanner) Close() {
228+
scanner.dbItr.Release()
229+
}
230+
231+
func selfSimulated(pvtSimRes *EndorserPvtSimulationResults) bool {
232+
return pvtSimRes.EndorserID == ""
233+
}

core/transientstore/store_helper.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package transientstore
18+
19+
import (
20+
"bytes"
21+
"path/filepath"
22+
23+
"github.com/hyperledger/fabric/common/ledger/util"
24+
"github.com/hyperledger/fabric/core/config"
25+
)
26+
27+
var (
28+
prwsetPrefix = []byte("P")[0] // key prefix for storing private read-write set in transient store.
29+
purgeIndexPrefix = []byte("I")[0] // key prefix for storing index on private read-write set using endorsement block height.
30+
compositeKeySep = byte(0x00)
31+
)
32+
33+
// createCompositeKeyForPvtRWSet creates a key for storing private read-write set
34+
// in the transient store. The structure of the key is <prwsetPrefix>~txid~endorserid~endorsementBlkHt.
35+
func createCompositeKeyForPvtRWSet(txid string, endorserid string, endorsementBlkHt uint64) []byte {
36+
var compositeKey []byte
37+
compositeKey = append(compositeKey, prwsetPrefix)
38+
compositeKey = append(compositeKey, compositeKeySep)
39+
compositeKey = append(compositeKey, []byte(txid)...)
40+
compositeKey = append(compositeKey, compositeKeySep)
41+
compositeKey = append(compositeKey, []byte(endorserid)...)
42+
compositeKey = append(compositeKey, compositeKeySep)
43+
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
44+
45+
return compositeKey
46+
}
47+
48+
// createCompositeKeyForPurgeIndex creates a key to index private read-write set based on
49+
// endorsement block height such that purge based on block height can be achieved. The structure
50+
// of the key is <purgeIndexPrefix>~endorsementBlkHt~txid~endorserid.
51+
func createCompositeKeyForPurgeIndex(endorsementBlkHt uint64, txid string, endorserid string) []byte {
52+
var compositeKey []byte
53+
compositeKey = append(compositeKey, purgeIndexPrefix)
54+
compositeKey = append(compositeKey, compositeKeySep)
55+
compositeKey = append(compositeKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
56+
compositeKey = append(compositeKey, compositeKeySep)
57+
compositeKey = append(compositeKey, []byte(txid)...)
58+
compositeKey = append(compositeKey, compositeKeySep)
59+
compositeKey = append(compositeKey, []byte(endorserid)...)
60+
61+
return compositeKey
62+
}
63+
64+
// splitCompositeKeyOfPvtRWSet splits the compositeKey (<prwsetPrefix>~txid~endorserid~endorsementBlkHt) into endorserId and endorsementBlkHt.
65+
func splitCompositeKeyOfPvtRWSet(compositeKey []byte) (endorserid string, endorsementBlkHt uint64) {
66+
compositeKey = compositeKey[2:]
67+
firstSepIndex := bytes.IndexByte(compositeKey, compositeKeySep)
68+
secondSepIndex := firstSepIndex + bytes.IndexByte(compositeKey[firstSepIndex+1:], compositeKeySep) + 1
69+
endorserid = string(compositeKey[firstSepIndex+1 : secondSepIndex])
70+
endorsementBlkHt, _ = util.DecodeOrderPreservingVarUint64(compositeKey[secondSepIndex+1:])
71+
return endorserid, endorsementBlkHt
72+
}
73+
74+
// splitCompositeKeyOfPurgeIndex splits the compositeKey (<purgeIndexPrefix>~endorsementBlkHt~txid~endorserid) into txid, endorserid and endorsementBlkHt.
75+
func splitCompositeKeyOfPurgeIndex(compositeKey []byte) (txid string, endorserid string, endorsementBlkHt uint64) {
76+
var n int
77+
endorsementBlkHt, n = util.DecodeOrderPreservingVarUint64(compositeKey[2:])
78+
splits := bytes.Split(compositeKey[n+3:], []byte{compositeKeySep})
79+
txid = string(splits[0])
80+
endorserid = string(splits[1])
81+
return
82+
}
83+
84+
// createTxidRangeStartKey returns a startKey to do a range query on transient store using txid
85+
func createTxidRangeStartKey(txid string) []byte {
86+
var startKey []byte
87+
startKey = append(startKey, prwsetPrefix)
88+
startKey = append(startKey, compositeKeySep)
89+
startKey = append(startKey, []byte(txid)...)
90+
startKey = append(startKey, compositeKeySep)
91+
return startKey
92+
}
93+
94+
// createTxidRangeEndKey returns a endKey to do a range query on transient store using txid
95+
func createTxidRangeEndKey(txid string) []byte {
96+
var endKey []byte
97+
endKey = append(endKey, prwsetPrefix)
98+
endKey = append(endKey, compositeKeySep)
99+
endKey = append(endKey, []byte(txid)...)
100+
endKey = append(endKey, byte(0xff))
101+
return endKey
102+
}
103+
104+
// createEndorsementBlkHtRangeStartKey returns a startKey to do a range query on index stored in transient store
105+
// using endorsementBlkHt
106+
func createEndorsementBlkHtRangeStartKey(endorsementBlkHt uint64) []byte {
107+
var startKey []byte
108+
startKey = append(startKey, purgeIndexPrefix)
109+
startKey = append(startKey, compositeKeySep)
110+
startKey = append(startKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
111+
startKey = append(startKey, compositeKeySep)
112+
return startKey
113+
}
114+
115+
// createEndorsementBlkHtRangeStartKey returns a endKey to do a range query on index stored in transient store
116+
// using endorsementBlkHt
117+
func createEndorsementBlkHtRangeEndKey(endorsementBlkHt uint64) []byte {
118+
var endKey []byte
119+
endKey = append(endKey, purgeIndexPrefix)
120+
endKey = append(endKey, compositeKeySep)
121+
endKey = append(endKey, util.EncodeOrderPreservingVarUint64(endorsementBlkHt)...)
122+
endKey = append(endKey, byte(0xff))
123+
return endKey
124+
}
125+
126+
// GetTransientStorePath returns the filesystem path for temporarily storing the private rwset
127+
func getTransientStorePath() string {
128+
sysPath := config.GetPath("peer.fileSystemPath")
129+
return filepath.Join(sysPath, "transientStore")
130+
}

0 commit comments

Comments
 (0)