Skip to content

Commit 6105903

Browse files
manish-sethiyacovm
authored andcommitted
[FAB-6053] Enhance transient store retrieval api
This CR enhances the transient store retrieval api with an additional parameter `filter` that acts as a projection for the results and retains only namespace/collection specified in the filter Change-Id: I2dd20f4382f1566a65c0bd23689c77a0ec60de24 Signed-off-by: manish <manish.sethi@gmail.com> Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 3c53aab commit 6105903

File tree

13 files changed

+167
-114
lines changed

13 files changed

+167
-114
lines changed

core/endorser/endorser.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ package endorser
99
import (
1010
"fmt"
1111

12+
"github.com/hyperledger/fabric/protos/ledger/rwset"
13+
1214
"github.com/golang/protobuf/proto"
1315
"github.com/hyperledger/fabric/common/flogging"
1416
"golang.org/x/net/context"
@@ -52,7 +54,7 @@ var endorserLogger = flogging.MustGetLogger("endorser")
5254
// The Jira issue that documents Endorser flow along with its relationship to
5355
// the lifecycle chaincode - https://jira.hyperledger.org/browse/FAB-181
5456

55-
type privateDataDistributor func(channel string, txID string, privateData []byte) error
57+
type privateDataDistributor func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error
5658

5759
// Endorser provides the Endorser service ProcessProposal
5860
type Endorser struct {
@@ -261,7 +263,6 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st
261263
//---3. execute the proposal and get simulation results
262264
var simResult *ledger.TxSimulationResults
263265
var pubSimResBytes []byte
264-
var prvtSimResBytes []byte
265266
var res *pb.Response
266267
var ccevent *pb.ChaincodeEvent
267268
res, ccevent, err = e.callChaincode(ctx, chainID, version, txid, signedProp, prop, cis, cid, txsim)
@@ -275,16 +276,11 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st
275276
return nil, nil, nil, nil, err
276277
}
277278

278-
if prvtSimResBytes, err = simResult.GetPvtSimulationBytes(); err != nil {
279-
return nil, nil, nil, nil, err
280-
}
281-
282-
if len(prvtSimResBytes) > 0 {
283-
if err := e.distributePrivateData(chainID, txid, prvtSimResBytes); err != nil {
279+
if simResult.PvtSimulationResults != nil {
280+
if err := e.distributePrivateData(chainID, txid, simResult.PvtSimulationResults); err != nil {
284281
return nil, nil, nil, nil, err
285282
}
286283
}
287-
288284
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
289285
return nil, nil, nil, nil, err
290286
}

core/endorser/endorser_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ import (
3030
"testing"
3131
"time"
3232

33+
"github.com/hyperledger/fabric/protos/ledger/rwset"
34+
3335
"github.com/golang/protobuf/proto"
3436
"github.com/hyperledger/fabric/bccsp"
3537
"github.com/hyperledger/fabric/bccsp/factory"
@@ -746,7 +748,7 @@ func TestMain(m *testing.M) {
746748
return
747749
}
748750

749-
endorserServer = NewEndorserServer(func(channel string, txID string, privateData []byte) error {
751+
endorserServer = NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
750752
return nil
751753
})
752754

core/ledger/kvledger/example/committer.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,7 @@
11
/*
2-
Copyright IBM Corp. 2016 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package example
@@ -35,7 +25,7 @@ func ConstructCommitter(ledger ledger.PeerLedger) *Committer {
3525
// Commit commits the block
3626
func (c *Committer) Commit(rawBlock *common.Block) error {
3727
logger.Debugf("Committer validating the block...")
38-
if err := c.ledger.Commit(rawBlock); err != nil {
28+
if err := c.ledger.CommitWithPvtData(&ledger.BlockAndPvtData{Block: rawBlock}); err != nil {
3929
return err
4030
}
4131
return nil

core/ledger/pvtdatastorage/store_impl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
167167
return nil, err
168168
}
169169
logger.Debugf("Retrieving pvtdata for bNum=%d, tNum=%d", bNum, tNum)
170-
filteredWSet := trimPvtWSet(pvtWSet, filter)
170+
filteredWSet := TrimPvtWSet(pvtWSet, filter)
171171
pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: tNum, WriteSet: filteredWSet})
172172
}
173173
return pvtData, nil
@@ -246,7 +246,9 @@ func (s *store) getLastCommittedBlockNum() (bool, uint64, error) {
246246
return false, decodeBlockNum(v), nil
247247
}
248248

249-
func trimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet {
249+
// TrimPvtWSet returns a `TxPvtReadWriteSet` that retains only list of 'ns/collections' supplied in the filter
250+
// A nil filter does not filter any results and returns the original `pvtWSet` as is
251+
func TrimPvtWSet(pvtWSet *rwset.TxPvtReadWriteSet, filter ledger.PvtNsCollFilter) *rwset.TxPvtReadWriteSet {
250252
if filter == nil {
251253
return pvtWSet
252254
}

core/transientstore/store.go

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,20 @@
11
/*
2-
Copyright IBM Corp. 2017 All Rights Reserved.
2+
Copyright IBM Corp. All Rights Reserved.
33
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
4+
SPDX-License-Identifier: Apache-2.0
155
*/
166

177
package transientstore
188

199
import (
2010
"errors"
2111

22-
commonledger "github.com/hyperledger/fabric/common/ledger"
12+
"github.com/golang/protobuf/proto"
13+
"github.com/hyperledger/fabric/protos/ledger/rwset"
14+
2315
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
16+
"github.com/hyperledger/fabric/core/ledger"
17+
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
2418
"github.com/syndtr/goleveldb/leveldb/iterator"
2519
)
2620

@@ -44,10 +38,10 @@ type StoreProvider interface {
4438
// the permanent storage or the pruning of some data items is enforced by the policy
4539
type Store interface {
4640
// Persist stores the private read-write set of a transaction in the transient store
47-
Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults []byte) error
41+
Persist(txid string, endorserid string, endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error
4842
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
4943
// RWSets persisted from different endorsers (via Gossip)
50-
GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error)
44+
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error)
5145
// GetSelfSimulatedTxPvtRWSetByTxid returns the private read-write set generated from the simulation
5246
// performed by the peer itself
5347
GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error)
@@ -64,7 +58,7 @@ type Store interface {
6458
type EndorserPvtSimulationResults struct {
6559
EndorserID string
6660
EndorsementBlockHeight uint64
67-
PvtSimulationResults []byte
61+
PvtSimulationResults *rwset.TxPvtReadWriteSet
6862
}
6963

7064
//////////////////////////////////////////////
@@ -85,8 +79,9 @@ type store struct {
8579
}
8680

8781
type rwsetScanner struct {
88-
txid string
89-
dbItr iterator.Iterator
82+
txid string
83+
dbItr iterator.Iterator
84+
filter ledger.PvtNsCollFilter
9085
}
9186

9287
// NewStoreProvider instantiates TransientStoreProvider
@@ -95,16 +90,6 @@ func NewStoreProvider() StoreProvider {
9590
return &storeProvider{dbProvider: dbProvider}
9691
}
9792

98-
// NewCustomPathStoreProvider constructs a StoreProvider at the given path
99-
// This function is used by ledger to construct a dedicated store till the time the store is managed
100-
// by the transient coordinator. A separate store location by the ledger is desired so that
101-
// the coordinator can be developed independently to handle the management of the transient store at default location.
102-
// Once the coordinator is developed, the ledger can stop using the transient store and this function can be removed.
103-
func NewCustomPathStoreProvider(path string) StoreProvider {
104-
dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: path})
105-
return &storeProvider{dbProvider: dbProvider}
106-
}
107-
10893
// OpenStore returns a handle to a ledgerId in Store
10994
func (provider *storeProvider) OpenStore(ledgerID string) (Store, error) {
11095
dbHandle := provider.dbProvider.GetDBHandle(ledgerID)
@@ -118,12 +103,16 @@ func (provider *storeProvider) Close() {
118103

119104
// Persist stores the private read-write set of a transaction in the transient store
120105
func (s *store) Persist(txid string, endorserid string,
121-
endorsementBlkHt uint64, privateSimulationResults []byte) error {
106+
endorsementBlkHt uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error {
122107
dbBatch := leveldbhelper.NewUpdateBatch()
123108

124109
// Create compositeKey with appropriate prefix, txid, endorserid and endorsementBlkHt
125110
compositeKey := createCompositeKeyForPvtRWSet(txid, endorserid, endorsementBlkHt)
126-
dbBatch.Put(compositeKey, privateSimulationResults)
111+
privateSimulationResultsBytes, err := proto.Marshal(privateSimulationResults)
112+
if err != nil {
113+
return err
114+
}
115+
dbBatch.Put(compositeKey, privateSimulationResultsBytes)
127116

128117
// Create compositeKey with appropriate prefix, endorsementBlkHt, txid, endorserid & Store
129118
// the compositeKey (purge index) a null byte as value.
@@ -136,34 +125,29 @@ func (s *store) Persist(txid string, endorserid string,
136125
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
137126
// RWSets persisted from different endorserids. Eventually, we may pass a set of collections name
138127
// (filters) along with txid.
139-
func (s *store) GetTxPvtRWSetByTxid(txid string) (commonledger.ResultsIterator, error) {
128+
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (*rwsetScanner, error) {
140129
// Construct startKey and endKey to do an range query
141130
startKey := createTxidRangeStartKey(txid)
142131
endKey := createTxidRangeEndKey(txid)
143132

144133
iter := s.db.GetIterator(startKey, endKey)
145-
return &rwsetScanner{txid, iter}, nil
134+
return &rwsetScanner{txid, iter, filter}, nil
146135
}
147136

148137
// GetSelfSimulatedTxPvtRWSetByTxid returns the private write set generated from the simulation performed
149138
// by the peer itself. Eventually, we may pass a set of collections name (filters) along with txid.
150139
func (s *store) GetSelfSimulatedTxPvtRWSetByTxid(txid string) (*EndorserPvtSimulationResults, error) {
151140
var err error
152-
var itr commonledger.ResultsIterator
153-
if itr, err = s.GetTxPvtRWSetByTxid(txid); err != nil {
141+
var itr *rwsetScanner
142+
if itr, err = s.GetTxPvtRWSetByTxid(txid, nil); err != nil {
154143
return nil, err
155144
}
156145
defer itr.Close()
157-
var temp commonledger.QueryResult
158146
var res *EndorserPvtSimulationResults
159147
for {
160-
if temp, err = itr.Next(); err != nil {
148+
if res, err = itr.Next(); res == nil || err != nil {
161149
return nil, err
162150
}
163-
if temp == nil {
164-
return nil, nil
165-
}
166-
res = temp.(*EndorserPvtSimulationResults)
167151
if selfSimulated(res) {
168152
return res, nil
169153
}
@@ -219,17 +203,24 @@ func (s *store) Shutdown() {
219203

220204
// Next moves the iterator to the next key/value pair.
221205
// It returns whether the iterator is exhausted.
222-
func (scanner *rwsetScanner) Next() (commonledger.QueryResult, error) {
206+
func (scanner *rwsetScanner) Next() (*EndorserPvtSimulationResults, error) {
223207
if !scanner.dbItr.Next() {
224208
return nil, nil
225209
}
226210
dbKey := scanner.dbItr.Key()
227211
dbVal := scanner.dbItr.Value()
228212
endorserid, endorsementBlkHt := splitCompositeKeyOfPvtRWSet(dbKey)
213+
214+
txPvtRWSet := &rwset.TxPvtReadWriteSet{}
215+
if err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {
216+
return nil, err
217+
}
218+
filteredTxPvtRWSet := pvtdatastorage.TrimPvtWSet(txPvtRWSet, scanner.filter)
219+
229220
return &EndorserPvtSimulationResults{
230221
EndorserID: endorserid,
231222
EndorsementBlockHeight: endorsementBlkHt,
232-
PvtSimulationResults: dbVal,
223+
PvtSimulationResults: filteredTxPvtRWSet,
233224
}, nil
234225
}
235226

0 commit comments

Comments
 (0)