Skip to content

Commit

Permalink
[FAB-5874] Support for queries over pvtdata
Browse files Browse the repository at this point in the history
This CR allows queries over pvt data in addition to the
key based (Get/Set) operations. The queries over pvt
data is allowed in read-only transactions. Because, for
the peers that do not have pvt data, there is no easy way to
detect phantom items during commit time and hence guaranteeing
the serializability of trans.

If a tran performs both 'queries over pvt data' and
update operations an error is raised during simulation.

Change-Id: Ia607c13d16a20e939bbe978476757086866fa84a
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and rameshthoomu committed Sep 5, 2017
1 parent 590dce1 commit a0ad3d0
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/ledger/kvledger/example/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ConstructCommitter(ledger ledger.PeerLedger) *Committer {
// Commit commits the block
func (c *Committer) Commit(rawBlock *common.Block) error {
logger.Debugf("Committer validating the block...")
if err := c.ledger.Commit(rawBlock); err != nil {
if err := c.ledger.CommitWithPvtData(&ledger.BlockAndPvtData{Block: rawBlock}); err != nil {
return err
}
return nil
Expand Down
217 changes: 217 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package privacyenabledstate
import (
"fmt"
"os"
"strings"
"testing"

"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
Expand Down Expand Up @@ -120,6 +122,221 @@ func testDB(t *testing.T, env TestEnv) {

//TODO add tests for functions GetPrivateStateMultipleKeys and GetPrivateStateRangeScanIterator

func TestGetStateMultipleKeys(t *testing.T) {
for _, env := range testEnvs {
t.Run(env.GetName(), func(t *testing.T) {
testGetStateMultipleKeys(t, env)
})
}
}

func testGetStateMultipleKeys(t *testing.T, env TestEnv) {
env.Init(t)
defer env.Cleanup()
db := env.GetDBHandle("test-ledger-id")

updates := NewUpdateBatch()

updates.PubUpdates.Put("ns1", "key1", []byte("value1"), version.NewHeight(1, 1))
updates.PubUpdates.Put("ns1", "key2", []byte("value2"), version.NewHeight(1, 2))
updates.PubUpdates.Put("ns1", "key3", []byte("value3"), version.NewHeight(1, 3))

putPvtUpdates(t, updates, "ns1", "coll1", "key1", []byte("pvt_value1"), version.NewHeight(1, 4))
putPvtUpdates(t, updates, "ns1", "coll1", "key2", []byte("pvt_value2"), version.NewHeight(1, 5))
putPvtUpdates(t, updates, "ns1", "coll1", "key3", []byte("pvt_value3"), version.NewHeight(1, 6))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 6))

versionedVals, err := db.GetStateMultipleKeys("ns1", []string{"key1", "key3"})
assert.NoError(t, err)
assert.Equal(t,
[]*statedb.VersionedValue{
&statedb.VersionedValue{Value: []byte("value1"), Version: version.NewHeight(1, 1)},
&statedb.VersionedValue{Value: []byte("value3"), Version: version.NewHeight(1, 3)},
},
versionedVals)

pvtVersionedVals, err := db.GetPrivateDataMultipleKeys("ns1", "coll1", []string{"key1", "key3"})
assert.NoError(t, err)
assert.Equal(t,
[]*statedb.VersionedValue{
&statedb.VersionedValue{Value: []byte("pvt_value1"), Version: version.NewHeight(1, 4)},
&statedb.VersionedValue{Value: []byte("pvt_value3"), Version: version.NewHeight(1, 6)},
},
pvtVersionedVals)
}

func TestGetStateRangeScanIterator(t *testing.T) {
for _, env := range testEnvs {
t.Run(env.GetName(), func(t *testing.T) {
testGetStateMultipleKeys(t, env)
})
}
}

func testGetStateRangeScanIterator(t *testing.T, env TestEnv) {
env.Init(t)
defer env.Cleanup()
db := env.GetDBHandle("test-ledger-id")

updates := NewUpdateBatch()

updates.PubUpdates.Put("ns1", "key1", []byte("value1"), version.NewHeight(1, 1))
updates.PubUpdates.Put("ns1", "key2", []byte("value2"), version.NewHeight(1, 2))
updates.PubUpdates.Put("ns1", "key3", []byte("value3"), version.NewHeight(1, 3))
updates.PubUpdates.Put("ns1", "key4", []byte("value4"), version.NewHeight(1, 4))
updates.PubUpdates.Put("ns2", "key5", []byte("value5"), version.NewHeight(1, 5))
updates.PubUpdates.Put("ns2", "key6", []byte("value6"), version.NewHeight(1, 6))
updates.PubUpdates.Put("ns3", "key7", []byte("value7"), version.NewHeight(1, 7))

putPvtUpdates(t, updates, "ns1", "coll1", "key1", []byte("pvt_value1"), version.NewHeight(1, 1))
putPvtUpdates(t, updates, "ns1", "coll1", "key2", []byte("pvt_value2"), version.NewHeight(1, 2))
putPvtUpdates(t, updates, "ns1", "coll1", "key3", []byte("pvt_value3"), version.NewHeight(1, 3))
putPvtUpdates(t, updates, "ns1", "coll1", "key4", []byte("pvt_value4"), version.NewHeight(1, 4))
putPvtUpdates(t, updates, "ns2", "coll1", "key5", []byte("pvt_value5"), version.NewHeight(1, 5))
putPvtUpdates(t, updates, "ns2", "coll1", "key6", []byte("pvt_value6"), version.NewHeight(1, 6))
putPvtUpdates(t, updates, "ns3", "coll1", "key7", []byte("pvt_value7"), version.NewHeight(1, 7))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 7))

itr1, _ := db.GetStateRangeScanIterator("ns1", "key1", "")
testItr(t, itr1, []string{"key1", "key2", "key3", "key4"})

itr2, _ := db.GetStateRangeScanIterator("ns1", "key2", "key3")
testItr(t, itr2, []string{"key2"})

itr3, _ := db.GetStateRangeScanIterator("ns1", "", "")
testItr(t, itr3, []string{"key1", "key2", "key3", "key4"})

itr4, _ := db.GetStateRangeScanIterator("ns2", "", "")
testItr(t, itr4, []string{"key5", "key6"})

pvtItr1, _ := db.GetPrivateDataRangeScanIterator("ns1", "coll1", "key1", "")
testItr(t, pvtItr1, []string{"key1", "key2", "key3", "key4"})

pvtItr2, _ := db.GetPrivateDataRangeScanIterator("ns1", "coll1", "key2", "key3")
testItr(t, pvtItr2, []string{"key2"})

pvtItr3, _ := db.GetPrivateDataRangeScanIterator("ns1", "coll1", "", "")
testItr(t, pvtItr3, []string{"key1", "key2", "key3", "key4"})

pvtItr4, _ := db.GetPrivateDataRangeScanIterator("ns2", "coll1", "", "")
testItr(t, pvtItr4, []string{"key5", "key6"})
}

func TestQueryOnCouchDB(t *testing.T) {
for _, env := range testEnvs {
_, ok := env.(*CouchDBCommonStorageTestEnv)
if !ok {
continue
}
t.Run(env.GetName(), func(t *testing.T) {
testQueryOnCouchDB(t, env)
})
}
}

func testQueryOnCouchDB(t *testing.T, env TestEnv) {
env.Init(t)
defer env.Cleanup()
db := env.GetDBHandle("test-ledger-id")
updates := NewUpdateBatch()

jsonValues := []string{
`{"asset_name": "marble1", "color": "blue", "size": 1, "owner": "tom"}`,
`{"asset_name": "marble2","color": "blue","size": 2,"owner": "jerry"}`,
`{"asset_name": "marble3","color": "blue","size": 3,"owner": "fred"}`,
`{"asset_name": "marble4","color": "blue","size": 4,"owner": "martha"}`,
`{"asset_name": "marble5","color": "blue","size": 5,"owner": "fred"}`,
`{"asset_name": "marble6","color": "blue","size": 6,"owner": "elaine"}`,
`{"asset_name": "marble7","color": "blue","size": 7,"owner": "fred"}`,
`{"asset_name": "marble8","color": "blue","size": 8,"owner": "elaine"}`,
`{"asset_name": "marble9","color": "green","size": 9,"owner": "fred"}`,
`{"asset_name": "marble10","color": "green","size": 10,"owner": "mary"}`,
`{"asset_name": "marble11","color": "cyan","size": 1000007,"owner": "joe"}`,
}

for i, jsonValue := range jsonValues {
updates.PubUpdates.Put("ns1", testKey(i), []byte(jsonValue), version.NewHeight(1, uint64(i)))
updates.PubUpdates.Put("ns2", testKey(i), []byte(jsonValue), version.NewHeight(1, uint64(i)))
putPvtUpdates(t, updates, "ns1", "coll1", testKey(i), []byte(jsonValue), version.NewHeight(1, uint64(i)))
putPvtUpdates(t, updates, "ns2", "coll1", testKey(i), []byte(jsonValue), version.NewHeight(1, uint64(i)))
}
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(1, 11))

// query for owner=jerry, use namespace "ns1"
itr, err := db.ExecuteQuery("ns1", `{"selector":{"owner":"jerry"}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(1)}, []string{"jerry"})

// query for owner=jerry, use namespace "ns2"
itr, err = db.ExecuteQuery("ns2", `{"selector":{"owner":"jerry"}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(1)}, []string{"jerry"})

// query for pvt data owner=jerry, use namespace "ns1"
itr, err = db.ExecuteQueryOnPrivateData("ns1", "coll1", `{"selector":{"owner":"jerry"}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(1)}, []string{"jerry"})

// query for pvt data owner=jerry, use namespace "ns2"
itr, err = db.ExecuteQueryOnPrivateData("ns2", "coll1", `{"selector":{"owner":"jerry"}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(1)}, []string{"jerry"})

// query using bad query string
itr, err = db.ExecuteQueryOnPrivateData("ns1", "coll1", "this is an invalid query string")
testutil.AssertError(t, err, "Should have received an error for invalid query string")

// query returns 0 records
itr, err = db.ExecuteQueryOnPrivateData("ns1", "coll1", `{"selector":{"owner":"not_a_valid_name"}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{}, []string{})

// query with embedded implicit "AND" and explicit "OR", namespace "ns1"
itr, err = db.ExecuteQueryOnPrivateData("ns1", "coll1", `{"selector":{"color":"green","$or":[{"owner":"fred"},{"owner":"mary"}]}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(8), testKey(9)}, []string{"green"}, []string{"green"})

// query with integer with digit-count equals 7 and response received is also received
// with same digit-count and there is no float transformation
itr, err = db.ExecuteQueryOnPrivateData("ns2", "coll1", `{"selector":{"$and":[{"size":{"$eq": 1000007}}]}}`)
testutil.AssertNoError(t, err, "")
testQueryItr(t, itr, []string{testKey(10)}, []string{"joe", "1000007"})
}

func testItr(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string) {
defer itr.Close()
for _, expectedKey := range expectedKeys {
queryResult, _ := itr.Next()
vkv := queryResult.(*statedb.VersionedKV)
key := vkv.Key
testutil.AssertEquals(t, key, expectedKey)
}
last, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, last)
}

func testQueryItr(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string, expectedValStrs ...[]string) {
defer itr.Close()
for i, expectedKey := range expectedKeys {
queryResult, _ := itr.Next()
vkv := queryResult.(*statedb.VersionedKV)
key := vkv.Key
valStr := string(vkv.Value)
testutil.AssertEquals(t, key, expectedKey)
for _, expectedValStr := range expectedValStrs[i] {
testutil.AssertEquals(t, strings.Contains(valStr, expectedValStr), true)
}
}
last, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, last)
}

func testKey(i int) string {
return fmt.Sprintf("key%d", i)
}

func putPvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
Expand Down
49 changes: 45 additions & 4 deletions core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,25 @@ func (h *queryHelper) getPrivateDataMultipleKeys(ns, coll string, keys []string)
}

func (h *queryHelper) getPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (commonledger.ResultsIterator, error) {
// TODO
return nil, errors.New("Not Yet Supported")
if err := h.checkDone(); err != nil {
return nil, err
}
dbItr, err := h.txmgr.db.GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey)
if err != nil {
return nil, err
}
return &pvtdataResultsItr{namespace, collection, dbItr}, nil
}

func (h *queryHelper) executeQueryOnPrivateData(namespace, collection, query string) (commonledger.ResultsIterator, error) {
// TODO
return nil, errors.New("Not Yet Supported")
if err := h.checkDone(); err != nil {
return nil, err
}
dbItr, err := h.txmgr.db.ExecuteQueryOnPrivateData(namespace, collection, query)
if err != nil {
return nil, err
}
return &pvtdataResultsItr{namespace, collection, dbItr}, nil
}

func (h *queryHelper) done() {
Expand Down Expand Up @@ -298,3 +310,32 @@ func decomposeVersionedValue(versionedValue *statedb.VersionedValue) ([]byte, *v
}
return value, ver
}

// pvtdataResultsItr iterates over results of a query on pvt data
type pvtdataResultsItr struct {
ns string
coll string
dbItr statedb.ResultsIterator
}

// Next implements method in interface ledger.ResultsIterator
func (itr *pvtdataResultsItr) Next() (commonledger.QueryResult, error) {
queryResult, err := itr.dbItr.Next()
if err != nil {
return nil, err
}
if queryResult == nil {
return nil, nil
}
versionedQueryRecord := queryResult.(*statedb.VersionedKV)
return &queryresult.KV{
Namespace: itr.ns,
Key: versionedQueryRecord.Key,
Value: versionedQueryRecord.Value,
}, nil
}

// Close implements method in interface ledger.ResultsIterator
func (itr *pvtdataResultsItr) Close() {
itr.dbItr.Close()
}
76 changes: 76 additions & 0 deletions core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lockbasedtxmgr

import (
"testing"

commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
)

func TestPvtdataResultsItr(t *testing.T) {
testEnv := testEnvs[0]
testEnv.init(t, "test-pvtdata-range-queries")
defer testEnv.cleanup()

txMgr := testEnv.getTxMgr().(*LockBasedTxMgr)

updates := privacyenabledstate.NewUpdateBatch()
putPvtUpdates(t, updates, "ns1", "coll1", "key1", []byte("pvt_value1"), version.NewHeight(1, 1))
putPvtUpdates(t, updates, "ns1", "coll1", "key2", []byte("pvt_value2"), version.NewHeight(1, 2))
putPvtUpdates(t, updates, "ns1", "coll1", "key3", []byte("pvt_value3"), version.NewHeight(1, 3))
putPvtUpdates(t, updates, "ns1", "coll1", "key4", []byte("pvt_value4"), version.NewHeight(1, 4))
putPvtUpdates(t, updates, "ns2", "coll1", "key5", []byte("pvt_value5"), version.NewHeight(1, 5))
putPvtUpdates(t, updates, "ns2", "coll1", "key6", []byte("pvt_value6"), version.NewHeight(1, 6))
putPvtUpdates(t, updates, "ns3", "coll1", "key7", []byte("pvt_value7"), version.NewHeight(1, 7))
txMgr.db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 7))
queryHelper := &queryHelper{txmgr: txMgr}

resItr, err := queryHelper.getPrivateDataRangeScanIterator("ns1", "coll1", "key1", "key3")
testutil.AssertNoError(t, err, "")
testItr(t, resItr, "ns1", "coll1", []string{"key1", "key2"})

resItr, err = queryHelper.getPrivateDataRangeScanIterator("ns4", "coll1", "key1", "key3")
testutil.AssertNoError(t, err, "")
testItr(t, resItr, "ns4", "coll1", []string{})
}

func putPvtUpdates(t *testing.T, updates *privacyenabledstate.UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
}

func testItr(t *testing.T, itr commonledger.ResultsIterator, expectedNs string, expectedColl string, expectedKeys []string) {
t.Logf("Testing itr for [%d] keys", len(expectedKeys))
defer itr.Close()
for _, expectedKey := range expectedKeys {
queryResult, _ := itr.Next()
pvtdataKV := queryResult.(*queryresult.KV)
ns := pvtdataKV.Namespace
key := pvtdataKV.Key
testutil.AssertEquals(t, ns, expectedNs)
testutil.AssertEquals(t, key, expectedKey)
}
last, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, last)
}
Loading

0 comments on commit a0ad3d0

Please sign in to comment.