Skip to content

Commit

Permalink
Filter out purged data from private data store
Browse files Browse the repository at this point in the history
This PR introduces support for the purge marker(s) and filters the data
marked for purging. In addition, this PR removes the

unused code for V11 format as now we upgraded data format along with
retroactively creating hashed index at peer start.

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed Feb 16, 2022
1 parent 7b7b6e9 commit cedbd51
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 65 deletions.
27 changes: 27 additions & 0 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (
bootKVHashesKeyPrefix = []byte{9}
lastBlockInBootSnapshotKey = []byte{'a'}
hashedIndexKeyPrefix = []byte{'b'}
purgeMarkerKeyPrefix = []byte{'c'}
purgeMarkerCollKeyPrefix = []byte{'d'}

nilByte = byte(0)
emptyValue = []byte{}
Expand Down Expand Up @@ -284,6 +286,31 @@ func encodeHashedIndexKey(k *hashedIndexKey) []byte {
return append(encKey, version.NewHeight(k.blkNum, k.txNum).ToBytes()...)
}

func encodePurgeMarkerCollKey(k *purgeMarkerCollKey) []byte {
encKey := append(purgeMarkerCollKeyPrefix, []byte(k.ns)...)
encKey = append(encKey, nilByte)
encKey = append(encKey, []byte(k.coll)...)
return encKey
}

func encodePurgeMarkerKey(k *purgeMarkerKey) []byte {
encKey := append(purgeMarkerKeyPrefix, []byte(k.ns)...)
encKey = append(encKey, nilByte)
encKey = append(encKey, []byte(k.coll)...)
encKey = append(encKey, nilByte)
encKey = append(encKey, k.pvtkeyHash...)
return encKey
}

func encodePurgeMarkerVal(v *purgeMarkerVal) []byte {
return version.NewHeight(v.blkNum, v.txNum).ToBytes()
}

func decodePurgeMarkerVal(b []byte) (*version.Height, error) {
v, _, err := version.NewHeightFromBytes(b)
return v, err
}

func deriveDataKeyFromEncodedHashedIndexKey(encHashedIndexKey []byte) ([]byte, error) {
firstNilByteIndex := 0
secondNilByteIndex := 0
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/pvtdatastorage/retroactive_hashed_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func constructHashedIndexFor(ledgerID string, db *leveldbhelper.DBHandle) error
return err
}

txPvtData, err := v11DecodeKV(k, v, nil)
txPvtData, err := v11DecodeKV(k, v)
if err != nil {
return err
}
Expand Down
166 changes: 155 additions & 11 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/confighistory"
"github.com/hyperledger/fabric/core/ledger/internal/version"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -125,6 +129,25 @@ type hashedIndexKey struct {
blkNum, txNum uint64
}

type PurgeMarker struct {
Ns, Coll string
PvtkeyHash []byte
BlkNum, TxNum uint64
}

type purgeMarkerKey struct {
ns, coll string
pvtkeyHash []byte
}

type purgeMarkerVal struct {
blkNum, txNum uint64
}

type purgeMarkerCollKey struct {
ns, coll string
}

type storeEntries struct {
dataEntries []*dataEntry
hashedIndexEntries []*hashedIndexEntry
Expand Down Expand Up @@ -406,6 +429,40 @@ func (s *Store) ResetLastUpdatedOldBlocksList() error {
return nil
}

func (s *Store) addPurgeMarkers(p *PurgeMarker) error {
b := s.db.NewUpdateBatch()
b.Put(
encodePurgeMarkerCollKey(
&purgeMarkerCollKey{
ns: p.Ns,
coll: p.Coll,
},
),
encodePurgeMarkerVal(
&purgeMarkerVal{
blkNum: p.BlkNum,
txNum: p.TxNum,
},
),
)
b.Put(
encodePurgeMarkerKey(
&purgeMarkerKey{
ns: p.Ns,
coll: p.Coll,
pvtkeyHash: p.PvtkeyHash,
},
),
encodePurgeMarkerVal(
&purgeMarkerVal{
blkNum: p.BlkNum,
txNum: p.TxNum,
},
),
)
return s.db.WriteBatch(b, true)
}

// GetPvtDataByBlockNum returns only the pvt data corresponding to the given block number
// The pvt data is filtered by the list of 'ns/collections' supplied in the filter
// A nil filter does not filter any results
Expand Down Expand Up @@ -433,13 +490,6 @@ func (s *Store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil

for itr.Next() {
dataKeyBytes := itr.Key()
v11Fmt, err := v11Format(dataKeyBytes)
if err != nil {
return nil, err
}
if v11Fmt {
return v11RetrievePvtdata(itr, filter)
}
dataValueBytes := itr.Value()
dataKey, err := decodeDatakey(dataKeyBytes)
if err != nil {
Expand All @@ -452,10 +502,6 @@ func (s *Store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
if expired || !passesFilter(dataKey, filter) {
continue
}
dataValue, err := decodeDataValue(dataValueBytes)
if err != nil {
return nil, err
}

if firstItr {
currentTxNum = dataKey.txNum
Expand All @@ -468,6 +514,16 @@ func (s *Store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
currentTxNum = dataKey.txNum
currentTxWsetAssember = newTxPvtdataAssembler(blockNum, currentTxNum)
}

dataValue, err := decodeDataValue(dataValueBytes)
if err != nil {
return nil, err
}

if err := s.removePurgedDataFromCollPvtRWset(dataKey, dataValue); err != nil {
return nil, err
}

currentTxWsetAssember.add(dataKey.ns, dataValue)
}
if currentTxWsetAssember != nil {
Expand All @@ -476,6 +532,94 @@ func (s *Store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFil
return blockPvtdata, nil
}

func (s *Store) retrieveLatestPurgeKeyCollMarkerHt(ns, coll string) (*version.Height, error) {
encVal, err := s.db.Get(
encodePurgeMarkerCollKey(
&purgeMarkerCollKey{
ns: ns,
coll: coll,
},
),
)
if err != nil {
return nil, err
}
if encVal == nil {
return nil, nil
}
return decodePurgeMarkerVal(encVal)
}

// keyPotentiallyPurged returns false if `purgeMarkerCollKey` does not exists (which means never any key is purged from the given collection)
// or the height of `purgeMarkerCollKey` is lower than the <ns, coll> in the data key (which means that the last purge of any key from the collection
// was prior to the given key commit). The main purpose of this function is to optimize while filtering the purge data by avoiding computing hashes
// of individual keys, all the time
func (s *Store) keyPotentiallyPurged(k *dataKey) (bool, error) {
purgeKeyCollMarkerHt, err := s.retrieveLatestPurgeKeyCollMarkerHt(k.ns, k.coll)
if purgeKeyCollMarkerHt == nil || err != nil {
return false, err
}

keyHt := &version.Height{
BlockNum: k.blkNum,
TxNum: k.txNum,
}

return keyHt.Compare(purgeKeyCollMarkerHt) <= 0, nil
}

func (s *Store) removePurgedDataFromCollPvtRWset(k *dataKey, v *rwset.CollectionPvtReadWriteSet) error {
purgePossible, err := s.keyPotentiallyPurged(k)
if !purgePossible || err != nil {
return err
}

collRWSet, err := rwsetutil.CollPvtRwSetFromProtoMsg(v)
if err != nil {
return err
}

keyHt := &version.Height{
BlockNum: k.blkNum,
TxNum: k.txNum,
}

filterInKVWrites := []*kvrwset.KVWrite{}
for _, w := range collRWSet.KvRwSet.Writes {
potentialPurgeMarker := encodePurgeMarkerKey(&purgeMarkerKey{
ns: k.ns,
coll: k.coll,
pvtkeyHash: util.ComputeStringHash(w.Key),
})

encPurgeMarkerVal, err := s.db.Get(potentialPurgeMarker)
if err != nil {
return err
}

if encPurgeMarkerVal == nil {
filterInKVWrites = append(filterInKVWrites, w)
continue
}

purgeMarkerHt, err := decodePurgeMarkerVal(encPurgeMarkerVal)
if err != nil {
return err
}

if keyHt.Compare(purgeMarkerHt) > 0 {
filterInKVWrites = append(filterInKVWrites, w)
continue
}
}

collRWSet.KvRwSet.Writes = filterInKVWrites
if v.Rwset, err = proto.Marshal(collRWSet.KvRwSet); err != nil {
return err
}
return nil
}

// GetMissingPvtDataInfoForMostRecentBlocks returns the missing private data information for the
// most recent `maxBlock` blocks which miss at least a private data of a eligible collection.
func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.MissingPvtDataInfo, error) {
Expand Down
Loading

0 comments on commit cedbd51

Please sign in to comment.