Skip to content

Commit f37beaa

Browse files
committed
Missingdata-recon: Handle coll eligibility change
This CR handles the event when a peer becomes eligible for receiving data for an existing collection. All the missing data entries for the collection that were previously marked as 'ineligible' are converted to 'eligible' in a background goroutine so that the query results for reporting missing data also include these entries for previous blocks FAB-11437 #done Change-Id: I145a079b69e8bf02b4c97da23fbf08d7ce2ae268 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent a2bf9dc commit f37beaa

File tree

12 files changed

+620
-186
lines changed

12 files changed

+620
-186
lines changed

common/ledger/util/leveldbhelper/leveldb_provider.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,11 @@ func (batch *UpdateBatch) Delete(key []byte) {
136136
batch.KVs[string(key)] = nil
137137
}
138138

139+
// Len returns the number of entries in the batch
140+
func (batch *UpdateBatch) Len() int {
141+
return len(batch.KVs)
142+
}
143+
139144
// Iterator extends actual leveldb iterator
140145
type Iterator struct {
141146
iterator.Iterator

core/ledger/ledgerconfig/ledger_config.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const confMaxBatchSize = "ledger.state.couchDBConfig.maxBatchUpdateSize"
3838
const confAutoWarmIndexes = "ledger.state.couchDBConfig.autoWarmIndexes"
3939
const confWarmIndexesAfterNBlocks = "ledger.state.couchDBConfig.warmIndexesAfterNBlocks"
4040

41+
var confCollElgProcMaxDbBatchSize = &conf{"ledger.pvtdataStore.collElgProcMaxDbBatchSize", 5000}
42+
var confCollElgProcDbBatchesInterval = &conf{"ledger.pvtdataStore.collElgProcDbBatchesInterval", 1000}
43+
4144
// GetRootPath returns the filesystem path.
4245
// All ledger related contents are expected to be stored under this path
4346
func GetRootPath() string {
@@ -85,7 +88,7 @@ func GetMaxBlockfileSize() int {
8588
return 64 * 1024 * 1024
8689
}
8790

88-
//GetTotalLimit exposes the totalLimit variable
91+
// GetTotalQueryLimit exposes the totalLimit variable
8992
func GetTotalQueryLimit() int {
9093
totalQueryLimit := viper.GetInt(confTotalQueryLimit)
9194
// if queryLimit was unset, default to 10000
@@ -95,7 +98,7 @@ func GetTotalQueryLimit() int {
9598
return totalQueryLimit
9699
}
97100

98-
//GetQueryLimit exposes the queryLimit variable
101+
// GetInternalQueryLimit exposes the queryLimit variable
99102
func GetInternalQueryLimit() int {
100103
internalQueryLimit := viper.GetInt(confInternalQueryLimit)
101104
// if queryLimit was unset, default to 1000
@@ -125,6 +128,26 @@ func GetPvtdataStorePurgeInterval() uint64 {
125128
return uint64(purgeInterval)
126129
}
127130

131+
// GetPvtdataStoreCollElgProcMaxDbBatchSize returns the maximum db batch size for converting
132+
// the ineligible missing data entries to eligible missing data entries
133+
func GetPvtdataStoreCollElgProcMaxDbBatchSize() int {
134+
collElgProcMaxDbBatchSize := viper.GetInt(confCollElgProcMaxDbBatchSize.Name)
135+
if collElgProcMaxDbBatchSize <= 0 {
136+
collElgProcMaxDbBatchSize = confCollElgProcMaxDbBatchSize.DefaultVal
137+
}
138+
return collElgProcMaxDbBatchSize
139+
}
140+
141+
// GetPvtdataStoreCollElgProcDbBatchesInterval returns the minimum duration (in milliseconds) between writing
142+
// two consecutive db batches for converting the ineligible missing data entries to eligible missing data entries
143+
func GetPvtdataStoreCollElgProcDbBatchesInterval() int {
144+
collElgProcDbBatchesInterval := viper.GetInt(confCollElgProcDbBatchesInterval.Name)
145+
if collElgProcDbBatchesInterval <= 0 {
146+
collElgProcDbBatchesInterval = confCollElgProcDbBatchesInterval.DefaultVal
147+
}
148+
return collElgProcDbBatchesInterval
149+
}
150+
128151
//IsHistoryDBEnabled exposes the historyDatabase variable
129152
func IsHistoryDBEnabled() bool {
130153
return viper.GetBool(confEnableHistoryDatabase)
@@ -162,3 +185,8 @@ func GetWarmIndexesAfterNBlocks() int {
162185
}
163186
return warmAfterNBlocks
164187
}
188+
189+
type conf struct {
190+
Name string
191+
DefaultVal int
192+
}

core/ledger/ledgerconfig/ledger_config_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,22 @@ func TestPvtdataStorePurgeInterval(t *testing.T) {
158158
assert.Equal(t, uint64(1000), updatedValue) //test config returns 1000
159159
}
160160

161+
func TestPvtdataStoreCollElgProcMaxDbBatchSize(t *testing.T) {
162+
defaultVal := confCollElgProcMaxDbBatchSize.DefaultVal
163+
testVal := defaultVal + 1
164+
assert.Equal(t, defaultVal, GetPvtdataStoreCollElgProcMaxDbBatchSize())
165+
viper.Set("ledger.pvtdataStore.collElgProcMaxDbBatchSize", testVal)
166+
assert.Equal(t, testVal, GetPvtdataStoreCollElgProcMaxDbBatchSize())
167+
}
168+
169+
func TestCollElgProcDbBatchesInterval(t *testing.T) {
170+
defaultVal := confCollElgProcDbBatchesInterval.DefaultVal
171+
testVal := defaultVal + 1
172+
assert.Equal(t, defaultVal, GetPvtdataStoreCollElgProcDbBatchesInterval())
173+
viper.Set("ledger.pvtdataStore.collElgProcDbBatchesInterval", testVal)
174+
assert.Equal(t, testVal, GetPvtdataStoreCollElgProcDbBatchesInterval())
175+
}
176+
161177
func TestIsHistoryDBEnabledDefault(t *testing.T) {
162178
setUpCoreYAMLConfig()
163179
defaultValue := IsHistoryDBEnabled()

core/ledger/pvtdatastorage/expiry_data.pb.go

Lines changed: 0 additions & 180 deletions
This file was deleted.

core/ledger/pvtdatastorage/kv_encoding.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ package pvtdatastorage
88

99
import (
1010
"bytes"
11+
"math"
1112

1213
"github.com/golang/protobuf/proto"
1314
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
1415
"github.com/hyperledger/fabric/core/ledger/util"
1516
"github.com/hyperledger/fabric/protos/ledger/rwset"
17+
"github.com/pkg/errors"
1618
"github.com/willf/bitset"
1719
)
1820

@@ -23,6 +25,7 @@ var (
2325
expiryKeyPrefix = []byte{3}
2426
eligibleMissingDataKeyPrefix = []byte{4}
2527
ineligibleMissingDataKeyPrefix = []byte{5}
28+
collElgKeyPrefix = []byte{6}
2629

2730
nilByte = byte(0)
2831
emptyValue = []byte{}
@@ -145,9 +148,51 @@ func decodeMissingDataValue(bitmapBytes []byte) (*bitset.BitSet, error) {
145148
return bitmap, nil
146149
}
147150

151+
func encodeCollElgKey(blkNum uint64) []byte {
152+
return append(collElgKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
153+
}
154+
155+
func decodeCollElgKey(b []byte) uint64 {
156+
blkNum, _ := util.DecodeReverseOrderVarUint64(b[1:])
157+
return blkNum
158+
}
159+
160+
func encodeCollElgVal(m *CollElgInfo) ([]byte, error) {
161+
return proto.Marshal(m)
162+
}
163+
164+
func decodeCollElgVal(b []byte) (*CollElgInfo, error) {
165+
m := &CollElgInfo{}
166+
if err := proto.Unmarshal(b, m); err != nil {
167+
return nil, errors.WithStack(err)
168+
}
169+
return m, nil
170+
}
171+
148172
func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) {
149173
startKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
150174
endKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(0)...)
151175

152176
return startKey, endKey
153177
}
178+
179+
func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) {
180+
startKey = encodeMissingDataKey(
181+
&missingDataKey{
182+
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum},
183+
isEligible: false,
184+
},
185+
)
186+
endKey = encodeMissingDataKey(
187+
&missingDataKey{
188+
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0},
189+
isEligible: false,
190+
},
191+
)
192+
return
193+
}
194+
195+
func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
196+
return encodeCollElgKey(math.MaxUint64),
197+
encodeCollElgKey(0)
198+
}

0 commit comments

Comments
 (0)