Skip to content

Commit 98d2747

Browse files
manish-sethidenyeart
authored andcommitted
[FAB-10381] Purge orphaned private keys
This CR fixes a bug that in the pvt data purge code that could leave the orphaned pvt keys un-purged. See Jira for a more detailed description of the bug. Also, a number of tests has been added to verify the above bug fix. Change-Id: I830ec704192977084398d91c29ddb7d364901640 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 186ae9f commit 98d2747

File tree

5 files changed

+294
-43
lines changed

5 files changed

+294
-43
lines changed

core/ledger/kvledger/txmgmt/privacyenabledstate/db.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func (b UpdateMap) Put(ns, coll, key string, value []byte, version *version.Heig
114114
b.getOrCreateNsBatch(ns).Put(coll, key, value, version)
115115
}
116116

117-
// Delete removes the entry from the batch for a given combination of namespace and collection name
117+
// Delete adds a delete marker in the batch for a given combination of namespace and collection name
118118
func (b UpdateMap) Delete(ns, coll, key string, version *version.Height) {
119119
b.getOrCreateNsBatch(ns).Delete(coll, key, version)
120120
}
@@ -128,6 +128,15 @@ func (b UpdateMap) Get(ns, coll, key string) *statedb.VersionedValue {
128128
return nsPvtBatch.Get(coll, key)
129129
}
130130

131+
// Contains returns true if the given <ns,coll,key> tuple is present in the batch
132+
func (b UpdateMap) Contains(ns, coll, key string) bool {
133+
nsBatch, ok := b[ns]
134+
if !ok {
135+
return false
136+
}
137+
return nsBatch.Exists(coll, key)
138+
}
139+
131140
func (nsb nsBatch) GetCollectionNames() []string {
132141
return nsb.GetUpdatedNamespaces()
133142
}
@@ -143,11 +152,7 @@ func (b UpdateMap) getOrCreateNsBatch(ns string) nsBatch {
143152

144153
// Contains returns true if the given <ns,coll,keyHash> tuple is present in the batch
145154
func (h HashedUpdateBatch) Contains(ns, coll string, keyHash []byte) bool {
146-
nsBatch, ok := h.UpdateMap[ns]
147-
if !ok {
148-
return false
149-
}
150-
return nsBatch.Exists(coll, string(keyHash))
155+
return h.UpdateMap.Contains(ns, coll, string(keyHash))
151156
}
152157

153158
// Put overrides the function in UpdateMap for allowing the key to be a []byte instead of a string

core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,30 @@ func buildExpirySchedule(
6868
hashedUpdateKeys := hashedUpdates.ToCompositeKeyMap()
6969
expiryScheduleBuilder := newExpiryScheduleBuilder(btlPolicy)
7070

71+
logger.Debugf("Building the expiry schedules based on the update batch")
72+
7173
// Iterate through the private data updates and for each key add into the expiry schedule
7274
// i.e., when these private data key and it's hashed-keys are going to be expired
7375
// Note that the 'hashedUpdateKeys' may be superset of the pvtUpdates. This is because,
7476
// the peer may not receive all the private data either because the peer is not eligible for certain private data
7577
// or because we allow proceeding with the missing private data data
7678
for pvtUpdateKey, vv := range pvtUpdates.ToCompositeKeyMap() {
7779
keyHash := util.ComputeStringHash(pvtUpdateKey.Key)
80+
hashedCompisiteKey := privacyenabledstate.HashedCompositeKey{
81+
Namespace: pvtUpdateKey.Namespace,
82+
CollectionName: pvtUpdateKey.CollectionName,
83+
KeyHash: string(keyHash),
84+
}
85+
logger.Debugf("Adding expiry schedule for key and key hash [%s]", &hashedCompisiteKey)
7886
if err := expiryScheduleBuilder.add(pvtUpdateKey.Namespace, pvtUpdateKey.CollectionName, pvtUpdateKey.Key, keyHash, vv); err != nil {
7987
return nil, err
8088
}
81-
delete(hashedUpdateKeys, privacyenabledstate.HashedCompositeKey{
82-
Namespace: pvtUpdateKey.Namespace, CollectionName: pvtUpdateKey.CollectionName, KeyHash: string(keyHash)})
89+
delete(hashedUpdateKeys, hashedCompisiteKey)
8390
}
8491

8592
// Add entries for the leftover key hashes i.e., the hashes corresponding to which there is not private key is present
8693
for hashedUpdateKey, vv := range hashedUpdateKeys {
94+
logger.Debugf("Adding expiry schedule for key hash [%s]", &hashedUpdateKey)
8795
if err := expiryScheduleBuilder.add(hashedUpdateKey.Namespace, hashedUpdateKey.CollectionName, "", []byte(hashedUpdateKey.KeyHash), vv); err != nil {
8896
return nil, err
8997
}

core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/expiry_schedule_builder_test.go

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ func TestBuildExpirySchedule(t *testing.T) {
2929
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs)
3030
updates := privacyenabledstate.NewUpdateBatch()
3131
updates.PubUpdates.Put("ns1", "pubkey1", []byte("pubvalue1"), version.NewHeight(1, 1))
32-
putPvtUpdates(t, updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1"), version.NewHeight(1, 1))
33-
putPvtUpdates(t, updates, "ns1", "coll2", "pvtkey2", []byte("pvtvalue2"), version.NewHeight(2, 1))
34-
putPvtUpdates(t, updates, "ns2", "coll3", "pvtkey3", []byte("pvtvalue3"), version.NewHeight(3, 1))
35-
putPvtUpdates(t, updates, "ns3", "coll4", "pvtkey4", []byte("pvtvalue4"), version.NewHeight(4, 1))
32+
putPvtAndHashUpdates(t, updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1"), version.NewHeight(1, 1))
33+
putPvtAndHashUpdates(t, updates, "ns1", "coll2", "pvtkey2", []byte("pvtvalue2"), version.NewHeight(2, 1))
34+
putPvtAndHashUpdates(t, updates, "ns2", "coll3", "pvtkey3", []byte("pvtvalue3"), version.NewHeight(3, 1))
35+
putPvtAndHashUpdates(t, updates, "ns3", "coll4", "pvtkey4", []byte("pvtvalue4"), version.NewHeight(4, 1))
3636

3737
listExpinfo, err := buildExpirySchedule(btlPolicy, updates.PvtUpdates, updates.HashUpdates)
3838
testutil.AssertNoError(t, err, "")
@@ -57,12 +57,67 @@ func TestBuildExpirySchedule(t *testing.T) {
5757
testutil.AssertContainsAll(t, listExpinfo, expectedListExpInfo)
5858
}
5959

60-
func putPvtUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
60+
func TestBuildExpiryScheduleWithMissingPvtdata(t *testing.T) {
61+
cs := btltestutil.NewMockCollectionStore()
62+
cs.SetBTL("ns1", "coll1", 1)
63+
cs.SetBTL("ns1", "coll2", 2)
64+
cs.SetBTL("ns2", "coll3", 3)
65+
cs.SetBTL("ns3", "coll4", 0)
66+
cs.SetBTL("ns3", "coll5", 20)
67+
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs)
68+
updates := privacyenabledstate.NewUpdateBatch()
69+
70+
// This update should appear in the expiry schedule with both the key and the hash
71+
putPvtAndHashUpdates(t, updates, "ns1", "coll1", "pvtkey1", []byte("pvtvalue1"), version.NewHeight(50, 1))
72+
73+
// This update should appear in the expiry schedule with only the key-hash
74+
putHashUpdates(updates, "ns1", "coll2", "pvtkey2", []byte("pvtvalue2"), version.NewHeight(50, 2))
75+
76+
// This update should appear in the expiry schedule with only the key-hash
77+
putHashUpdates(updates, "ns2", "coll3", "pvtkey3", []byte("pvtvalue3"), version.NewHeight(50, 3))
78+
79+
// this update is not expectd to appear in the expiry schdule as this collection is configured to expire - 'never'
80+
putPvtAndHashUpdates(t, updates, "ns3", "coll4", "pvtkey4", []byte("pvtvalue4"), version.NewHeight(50, 4))
81+
82+
// the following two updates are not expected to appear in the expiry schdule as they are deletes
83+
deletePvtAndHashUpdates(t, updates, "ns3", "coll5", "pvtkey5", version.NewHeight(50, 5))
84+
deleteHashUpdates(updates, "ns3", "coll5", "pvtkey6", version.NewHeight(50, 6))
85+
86+
listExpinfo, err := buildExpirySchedule(btlPolicy, updates.PvtUpdates, updates.HashUpdates)
87+
testutil.AssertNoError(t, err, "")
88+
t.Logf("listExpinfo=%s", spew.Sdump(listExpinfo))
89+
90+
pvtdataKeys1 := newPvtdataKeys()
91+
pvtdataKeys1.add("ns1", "coll1", "pvtkey1", util.ComputeStringHash("pvtkey1"))
92+
pvtdataKeys2 := newPvtdataKeys()
93+
pvtdataKeys2.add("ns1", "coll2", "", util.ComputeStringHash("pvtkey2"))
94+
pvtdataKeys3 := newPvtdataKeys()
95+
pvtdataKeys3.add("ns2", "coll3", "", util.ComputeStringHash("pvtkey3"))
96+
97+
expectedListExpInfo := []*expiryInfo{
98+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 52, committingBlk: 50}, pvtdataKeys: pvtdataKeys1},
99+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 53, committingBlk: 50}, pvtdataKeys: pvtdataKeys2},
100+
{expiryInfoKey: &expiryInfoKey{expiryBlk: 54, committingBlk: 50}, pvtdataKeys: pvtdataKeys3},
101+
}
102+
103+
testutil.AssertEquals(t, len(listExpinfo), 3)
104+
testutil.AssertContainsAll(t, listExpinfo, expectedListExpInfo)
105+
}
106+
107+
func putPvtAndHashUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
61108
updates.PvtUpdates.Put(ns, coll, key, value, ver)
62-
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
109+
putHashUpdates(updates, ns, coll, key, value, ver)
63110
}
64111

65-
func deletePvtUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) {
112+
func deletePvtAndHashUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) {
66113
updates.PvtUpdates.Delete(ns, coll, key, ver)
114+
deleteHashUpdates(updates, ns, coll, key, ver)
115+
}
116+
117+
func putHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
118+
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
119+
}
120+
121+
func deleteHashUpdates(updates *privacyenabledstate.UpdateBatch, ns, coll, key string, ver *version.Height) {
67122
updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver)
68123
}

core/ledger/kvledger/txmgmt/pvtstatepurgemgmt/purge_mgr.go

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
1414
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
15+
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
1516
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
1617
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
1718
)
@@ -31,6 +32,7 @@ type PurgeMgr interface {
3132
type keyAndVersion struct {
3233
key string
3334
committingBlock uint64
35+
purgeKeyOnly bool
3436
}
3537

3638
type expiryInfoMap map[*privacyenabledstate.HashedCompositeKey]*keyAndVersion
@@ -85,29 +87,37 @@ func (p *purgeMgr) DeleteExpiredAndUpdateBookkeeping(
8587
if p.workingset.err != nil {
8688
return p.workingset.err
8789
}
90+
91+
listExpiryInfo, err := buildExpirySchedule(p.btlPolicy, pvtUpdates, hashedUpdates)
92+
if err != nil {
93+
return err
94+
}
95+
8896
// For each key selected for purging, check if the key is not getting updated in the current block,
8997
// add its deletion in the update batches for pvt and hashed updates
9098
for compositeHashedKey, keyAndVersion := range p.workingset.toPurge {
9199
ns := compositeHashedKey.Namespace
92100
coll := compositeHashedKey.CollectionName
93101
keyHash := []byte(compositeHashedKey.KeyHash)
94102
key := keyAndVersion.key
95-
if hashedUpdates.Contains(ns, coll, keyHash) {
96-
logger.Debugf("Skipping the key [%s] from purging because it is already updated in the current batch", compositeHashedKey)
97-
continue
98-
}
103+
purgeKeyOnly := keyAndVersion.purgeKeyOnly
104+
hashUpdated := hashedUpdates.Contains(ns, coll, keyHash)
105+
pvtKeyUpdated := pvtUpdates.Contains(ns, coll, key)
106+
107+
logger.Debugf("Checking whether the key [ns=%s, coll=%s, keyHash=%x, purgeKeyOnly=%t] "+
108+
"is updated in the update batch for the committing block - hashUpdated=%t, and pvtKeyUpdated=%t",
109+
ns, coll, keyHash, purgeKeyOnly, hashUpdated, pvtKeyUpdated)
110+
99111
expiringTxVersion := version.NewHeight(p.workingset.expiringBlk, math.MaxUint64)
100-
logger.Debugf("Purging the hashed key [%s]", compositeHashedKey)
101-
hashedUpdates.Delete(ns, coll, keyHash, expiringTxVersion)
102-
if keyAndVersion.key != "" {
103-
logger.Debugf("Purging the pvt key corresponding to hashed key [%s]", compositeHashedKey)
112+
if !hashUpdated && !purgeKeyOnly {
113+
logger.Debugf("Adding the hashed key to be purged to the delete list in the update batch")
114+
hashedUpdates.Delete(ns, coll, keyHash, expiringTxVersion)
115+
}
116+
if key != "" && !pvtKeyUpdated {
117+
logger.Debugf("Adding the pvt key to be purged to the delete list in the update batch")
104118
pvtUpdates.Delete(ns, coll, key, expiringTxVersion)
105119
}
106120
}
107-
listExpiryInfo, err := buildExpirySchedule(p.btlPolicy, pvtUpdates, hashedUpdates)
108-
if err != nil {
109-
return err
110-
}
111121
return p.expKeeper.updateBookkeeping(listExpiryInfo, nil)
112122
}
113123

@@ -127,6 +137,7 @@ func (p *purgeMgr) BlockCommitDone() {
127137
// prepareWorkingsetFor returns a working set for a given expiring block 'expiringAtBlk'.
128138
// This working set contains the pvt data keys that will expire with the commit of block 'expiringAtBlk'.
129139
func (p *purgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
140+
logger.Debugf("Preparing potential purge list working-set for expiringAtBlk [%d]", expiringAtBlk)
130141
workingset := &workingset{expiringBlk: expiringAtBlk}
131142
// Retrieve the keys from bookkeeper
132143
expiryInfo, err := p.expKeeper.retrieve(expiringAtBlk)
@@ -140,18 +151,49 @@ func (p *purgeMgr) prepareWorkingsetFor(expiringAtBlk uint64) *workingset {
140151
p.preloadCommittedVersionsInCache(toPurge)
141152
var expiryInfoKeysToClear []*expiryInfoKey
142153

143-
// for each hashed key, check whether the committed version is still the same (i.e, the key was not overwritten already)
144-
for hashedKey, keyAndVersion := range toPurge {
145-
expiryInfoKeysToClear = append(expiryInfoKeysToClear, &expiryInfoKey{committingBlk: keyAndVersion.committingBlock, expiryBlk: expiringAtBlk})
146-
currentVersion, err := p.db.GetKeyHashVersion(hashedKey.Namespace, hashedKey.CollectionName, []byte(hashedKey.KeyHash))
154+
if len(toPurge) == 0 {
155+
logger.Debugf("No expiry entry found for expiringAtBlk [%d]", expiringAtBlk)
156+
return workingset
157+
}
158+
logger.Debugf("Total [%d] expiring entries found. Evaluaitng whether some of these keys have been overwritten in later blocks...", len(toPurge))
159+
160+
for purgeEntryK, purgeEntryV := range toPurge {
161+
logger.Debugf("Evaluating for hashedKey [%s]", purgeEntryK)
162+
expiryInfoKeysToClear = append(expiryInfoKeysToClear, &expiryInfoKey{committingBlk: purgeEntryV.committingBlock, expiryBlk: expiringAtBlk})
163+
currentVersion, err := p.db.GetKeyHashVersion(purgeEntryK.Namespace, purgeEntryK.CollectionName, []byte(purgeEntryK.KeyHash))
147164
if err != nil {
148165
workingset.err = err
149166
return workingset
150167
}
151-
// if the committed version is different from the expiry entry in the bookkeeper, remove the key from the purge-list (the key was overwritten already)
152-
if !sameVersion(currentVersion, keyAndVersion.committingBlock) {
153-
delete(toPurge, hashedKey)
168+
169+
if sameVersion(currentVersion, purgeEntryV.committingBlock) {
170+
logger.Debugf(
171+
"The version of the hashed key in the committed state and in the expiry entry is same " +
172+
"hence, keeping the entry in the purge list")
173+
continue
174+
}
175+
176+
logger.Debugf("The version of the hashed key in the committed state and in the expiry entry is different")
177+
if purgeEntryV.key != "" {
178+
logger.Debugf("The expiry entry also contains the raw key along with the key hash")
179+
committedPvtVerVal, err := p.db.GetPrivateData(purgeEntryK.Namespace, purgeEntryK.CollectionName, purgeEntryV.key)
180+
if err != nil {
181+
workingset.err = err
182+
return workingset
183+
}
184+
185+
if sameVersionFromVal(committedPvtVerVal, purgeEntryV.committingBlock) {
186+
logger.Debugf(
187+
"The version of the pvt key in the committed state and in the expiry entry is same" +
188+
"Including only key in the purge list and not the hashed key")
189+
purgeEntryV.purgeKeyOnly = true
190+
continue
191+
}
154192
}
193+
194+
// If we reached here, the keyhash and private key (if present, in the expiry entry) have been updated in a later block, therefore remove from current purge list
195+
logger.Debugf("Removing from purge list - the key hash and key (if present, in the expiry entry)")
196+
delete(toPurge, purgeEntryK)
155197
}
156198
// Final keys to purge from state
157199
workingset.toPurge = toPurge
@@ -189,3 +231,7 @@ func transformToExpiryInfoMap(expiryInfo []*expiryInfo) expiryInfoMap {
189231
func sameVersion(version *version.Height, blockNum uint64) bool {
190232
return version != nil && version.BlockNum == blockNum
191233
}
234+
235+
func sameVersionFromVal(vv *statedb.VersionedValue, blockNum uint64) bool {
236+
return vv != nil && sameVersion(vv.Version, blockNum)
237+
}

0 commit comments

Comments
 (0)