Skip to content

Commit 6e9e042

Browse files
committed
[FAB-5638] SideDB - ledger storage
This CR introduces ledger storage which maintains consistency in the underlying block storage and pvtdata store by ensuring an atomic operaton for writing blocks (to block storage) and writing the pvtdata (to pvtdata store) Change-Id: Ifb21f14d401eb10233db72c1473ba56f44bc119d Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 5d47989 commit 6e9e042

File tree

5 files changed

+367
-5
lines changed

5 files changed

+367
-5
lines changed

core/ledger/ledgerstorage/pkg_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ledgerstorage
18+
19+
import (
20+
"os"
21+
"testing"
22+
23+
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
24+
)
25+
26+
type testEnv struct {
27+
t testing.TB
28+
}
29+
30+
func newTestEnv(t *testing.T) *testEnv {
31+
testEnv := &testEnv{t}
32+
testEnv.cleanup()
33+
return testEnv
34+
}
35+
36+
func (env *testEnv) cleanup() {
37+
path := ledgerconfig.GetRootPath()
38+
os.RemoveAll(path)
39+
}

core/ledger/ledgerstorage/store.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ledgerstorage
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"github.com/hyperledger/fabric/common/ledger/blkstorage"
24+
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
25+
"github.com/hyperledger/fabric/core/ledger"
26+
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
27+
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
28+
"github.com/hyperledger/fabric/protos/common"
29+
)
30+
31+
// Provider encapusaltes two providers 1) block store provider and 2) and pvt data store provider
32+
type Provider struct {
33+
blkStoreProvider blkstorage.BlockStoreProvider
34+
pvtdataStoreProvider pvtdatastorage.Provider
35+
}
36+
37+
// Store encapsulates two stores 1) block store and pvt data store
38+
type Store struct {
39+
blkstorage.BlockStore
40+
pvtdataStore pvtdatastorage.Store
41+
rwlock *sync.RWMutex
42+
}
43+
44+
// NewProvider returns the handle to the provider
45+
func NewProvider() *Provider {
46+
// Initialize the block storage
47+
attrsToIndex := []blkstorage.IndexableAttr{
48+
blkstorage.IndexableAttrBlockHash,
49+
blkstorage.IndexableAttrBlockNum,
50+
blkstorage.IndexableAttrTxID,
51+
blkstorage.IndexableAttrBlockNumTranNum,
52+
blkstorage.IndexableAttrBlockTxID,
53+
blkstorage.IndexableAttrTxValidationCode,
54+
}
55+
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
56+
blockStoreProvider := fsblkstorage.NewProvider(
57+
fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),
58+
indexConfig)
59+
60+
pvtStoreProvider := pvtdatastorage.NewProvider()
61+
return &Provider{blockStoreProvider, pvtStoreProvider}
62+
}
63+
64+
// Open opens the store
65+
func (p *Provider) Open(ledgerid string) (*Store, error) {
66+
var blockStore blkstorage.BlockStore
67+
var pvtdataStore pvtdatastorage.Store
68+
var err error
69+
if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {
70+
return nil, err
71+
}
72+
if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil {
73+
return nil, err
74+
}
75+
store := &Store{blockStore, pvtdataStore, &sync.RWMutex{}}
76+
if err := store.init(); err != nil {
77+
return nil, err
78+
}
79+
return store, nil
80+
}
81+
82+
// Close closes the provider
83+
func (p *Provider) Close() {
84+
p.blkStoreProvider.Close()
85+
p.pvtdataStoreProvider.Close()
86+
}
87+
88+
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
89+
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
90+
s.rwlock.Lock()
91+
defer s.rwlock.Unlock()
92+
var pvtdata []*ledger.TxPvtData
93+
for _, v := range blockAndPvtdata.BlockPvtData {
94+
pvtdata = append(pvtdata, v)
95+
}
96+
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata); err != nil {
97+
return err
98+
}
99+
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
100+
s.pvtdataStore.Rollback()
101+
return err
102+
}
103+
return s.pvtdataStore.Commit()
104+
}
105+
106+
// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.
107+
// The pvt data is filtered by the list of 'collections' supplied
108+
func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsCollFilter) (*ledger.BlockAndPvtData, error) {
109+
s.rwlock.RLock()
110+
defer s.rwlock.RUnlock()
111+
112+
var block *common.Block
113+
var pvtdata []*ledger.TxPvtData
114+
var err error
115+
if block, err = s.RetrieveBlockByNumber(blockNum); err != nil {
116+
return nil, err
117+
}
118+
if pvtdata, err = s.GetPvtDataByNum(blockNum, filter); err != nil {
119+
return nil, err
120+
}
121+
return &ledger.BlockAndPvtData{Block: block, BlockPvtData: constructPvtdataMap(pvtdata)}, nil
122+
}
123+
124+
// GetPvtDataByNum returns only the pvt data corresponding to the given block number
125+
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
126+
// A nil filter does not filter any results
127+
func (s *Store) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
128+
s.rwlock.RLock()
129+
defer s.rwlock.RUnlock()
130+
131+
var pvtdata []*ledger.TxPvtData
132+
var err error
133+
if pvtdata, err = s.pvtdataStore.GetPvtDataByBlockNum(blockNum, filter); err != nil {
134+
return nil, err
135+
}
136+
return pvtdata, nil
137+
}
138+
139+
// init checks whether the block storage and pvt data store are in sync
140+
// this is called when the store instance is constructed and handed over for the use.
141+
// this check whether there is a pending batch (possibly from a previous system crash)
142+
// of pvt data that was not committed. If a pending batch exists, the check is made
143+
// whether the associated block was successfully committed in the block storage (before the crash)
144+
// or not. If the block was committed, the private data batch is committed
145+
// otherwise, the pvt data batch is rolledback
146+
func (s *Store) init() error {
147+
var pendingPvtbatch bool
148+
var err error
149+
if pendingPvtbatch, err = s.pvtdataStore.HasPendingBatch(); err != nil {
150+
return err
151+
}
152+
if !pendingPvtbatch {
153+
return nil
154+
}
155+
var bcInfo *common.BlockchainInfo
156+
var pvtdataStoreHt uint64
157+
158+
if bcInfo, err = s.GetBlockchainInfo(); err != nil {
159+
return err
160+
}
161+
if pvtdataStoreHt, err = s.pvtdataStore.LastCommittedBlockHeight(); err != nil {
162+
return err
163+
}
164+
165+
if bcInfo.Height == pvtdataStoreHt {
166+
return s.pvtdataStore.Rollback()
167+
}
168+
169+
if bcInfo.Height == pvtdataStoreHt+1 {
170+
return s.pvtdataStore.Commit()
171+
}
172+
173+
return fmt.Errorf("This is not expected. blockStoreHeight=%d, pvtdataStoreHeight=%d", bcInfo.Height, pvtdataStoreHt)
174+
}
175+
176+
func constructPvtdataMap(pvtdata []*ledger.TxPvtData) map[uint64]*ledger.TxPvtData {
177+
if pvtdata == nil {
178+
return nil
179+
}
180+
m := make(map[uint64]*ledger.TxPvtData)
181+
for _, pvtdatum := range pvtdata {
182+
m[pvtdatum.SeqInBlock] = pvtdatum
183+
}
184+
return m
185+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ledgerstorage
18+
19+
import (
20+
"os"
21+
"testing"
22+
23+
"github.com/hyperledger/fabric/common/flogging"
24+
"github.com/hyperledger/fabric/common/ledger/testutil"
25+
"github.com/hyperledger/fabric/core/ledger"
26+
"github.com/hyperledger/fabric/protos/ledger/rwset"
27+
"github.com/spf13/viper"
28+
"github.com/stretchr/testify/assert"
29+
)
30+
31+
func TestMain(m *testing.M) {
32+
flogging.SetModuleLevel("ledgerstorage", "debug")
33+
flogging.SetModuleLevel("pvtdatastorage", "debug")
34+
viper.Set("peer.fileSystemPath", "/tmp/fabric/core/ledger/ledgerstorage")
35+
os.Exit(m.Run())
36+
}
37+
38+
func TestStore(t *testing.T) {
39+
testEnv := newTestEnv(t)
40+
defer testEnv.cleanup()
41+
provider := NewProvider()
42+
defer provider.Close()
43+
store, err := provider.Open("testLedger")
44+
defer store.Shutdown()
45+
46+
assert.NoError(t, err)
47+
sampleData := sampleData(t)
48+
for _, sampleDatum := range sampleData {
49+
assert.NoError(t, store.CommitWithPvtData(sampleDatum))
50+
}
51+
52+
// block 1 has no pvt data
53+
pvtdata, err := store.GetPvtDataByNum(1, nil)
54+
assert.NoError(t, err)
55+
assert.Nil(t, pvtdata)
56+
57+
// block 4 has no pvt data
58+
pvtdata, err = store.GetPvtDataByNum(4, nil)
59+
assert.NoError(t, err)
60+
assert.Nil(t, pvtdata)
61+
62+
// block 2 has pvt data for tx 3 and 5 only
63+
pvtdata, err = store.GetPvtDataByNum(2, nil)
64+
assert.NoError(t, err)
65+
assert.Equal(t, 2, len(pvtdata))
66+
assert.Equal(t, uint64(3), pvtdata[0].SeqInBlock)
67+
assert.Equal(t, uint64(5), pvtdata[1].SeqInBlock)
68+
69+
// block 3 has pvt data for tx 4 and 6 only
70+
pvtdata, err = store.GetPvtDataByNum(3, nil)
71+
assert.NoError(t, err)
72+
assert.Equal(t, 2, len(pvtdata))
73+
assert.Equal(t, uint64(4), pvtdata[0].SeqInBlock)
74+
assert.Equal(t, uint64(6), pvtdata[1].SeqInBlock)
75+
76+
blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil)
77+
assert.NoError(t, err)
78+
assert.Equal(t, sampleData[2], blockAndPvtdata)
79+
80+
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil)
81+
assert.NoError(t, err)
82+
assert.Equal(t, sampleData[3], blockAndPvtdata)
83+
84+
// pvt data retrieval for block 3 with filter should return filtered pvtdata
85+
filter := ledger.NewPvtNsCollFilter()
86+
filter.Add("ns-1", "coll-1")
87+
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, filter)
88+
assert.NoError(t, err)
89+
assert.Equal(t, sampleData[3].Block, blockAndPvtdata.Block)
90+
// two transactions should be present
91+
assert.Equal(t, 2, len(blockAndPvtdata.BlockPvtData))
92+
// both tran number 4 and 6 should have only one collection because of filter
93+
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[4].WriteSet.NsPvtRwset))
94+
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[6].WriteSet.NsPvtRwset))
95+
// any other transaction entry should be nil
96+
assert.Nil(t, blockAndPvtdata.BlockPvtData[2])
97+
}
98+
99+
func sampleData(t *testing.T) []*ledger.BlockAndPvtData {
100+
var blockAndpvtdata []*ledger.BlockAndPvtData
101+
blocks := testutil.ConstructTestBlocks(t, 10)
102+
for i := 0; i < 10; i++ {
103+
blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]})
104+
}
105+
// txNum 3, 5 in block 2 has pvtdata
106+
blockAndpvtdata[2].BlockPvtData = samplePvtData(t, []uint64{3, 5})
107+
// txNum 4, 6 in block 3 has pvtdata
108+
blockAndpvtdata[3].BlockPvtData = samplePvtData(t, []uint64{4, 6})
109+
110+
return blockAndpvtdata
111+
}
112+
113+
func samplePvtData(t *testing.T, txNums []uint64) map[uint64]*ledger.TxPvtData {
114+
pvtWriteSet := &rwset.TxPvtReadWriteSet{DataModel: rwset.TxReadWriteSet_KV}
115+
pvtWriteSet.NsPvtRwset = []*rwset.NsPvtReadWriteSet{
116+
&rwset.NsPvtReadWriteSet{
117+
Namespace: "ns-1",
118+
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
119+
&rwset.CollectionPvtReadWriteSet{
120+
CollectionName: "coll-1",
121+
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll1"),
122+
},
123+
&rwset.CollectionPvtReadWriteSet{
124+
CollectionName: "coll-2",
125+
Rwset: []byte("RandomBytes-PvtRWSet-ns1-coll2"),
126+
},
127+
},
128+
},
129+
}
130+
var pvtData []*ledger.TxPvtData
131+
for _, txNum := range txNums {
132+
pvtData = append(pvtData, &ledger.TxPvtData{SeqInBlock: txNum, WriteSet: pvtWriteSet})
133+
}
134+
return constructPvtdataMap(pvtData)
135+
}

core/ledger/pvtdatastorage/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ type Store interface {
4444
Rollback() error
4545
// IsEmpty returns true if the store does not have any block committed yet
4646
IsEmpty() (bool, error)
47-
// LastCommittedBlock returns the last committed blocknum
48-
LastCommittedBlock() (uint64, error)
47+
// LastCommittedBlockHeight returns the height of the last committed block
48+
LastCommittedBlockHeight() (uint64, error)
4949
// HasPendingBatch returns if the store has a pending batch
5050
HasPendingBatch() (bool, error)
5151
// Shutdown stops the store

core/ledger/pvtdatastorage/store_impl.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,12 @@ func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
173173
return pvtData, nil
174174
}
175175

176-
// LastCommittedBlock implements the function in the interface `Store`
177-
func (s *store) LastCommittedBlock() (uint64, error) {
178-
return s.lastCommittedBlock, nil
176+
// LastCommittedBlockHeight implements the function in the interface `Store`
177+
func (s *store) LastCommittedBlockHeight() (uint64, error) {
178+
if s.isEmpty {
179+
return 0, nil
180+
}
181+
return s.lastCommittedBlock + 1, nil
179182
}
180183

181184
// HasPendingBatch implements the function in the interface `Store`

0 commit comments

Comments
 (0)