Skip to content

Commit

Permalink
[FAB-5084] Push private data upon endorsement
Browse files Browse the repository at this point in the history
This change set makes the peer push private data to a few peers
upon endorsement.

Change-Id: Ib165c9469a2f601da1298c46cfe5d2cd5f8cdccc
Signed-off-by: yacovm <yacovm@il.ibm.com>
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Sep 18, 2017
1 parent fe5b068 commit 21a97ba
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 159 deletions.
19 changes: 6 additions & 13 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,17 @@ type TransientStore interface {
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (transientstore.RWSetScanner, error)
}

// PrivateDataDistributor distributes private data to peers
type PrivateDataDistributor interface {
// Distribute distributes a given private data with a specific transactionID
// to peers according policies that are derived from the given PolicyStore and PolicyParser
Distribute(privateData *rwset.TxPvtReadWriteSet, txID string) error
}

// Coordinator orchestrates the flow of the new
// blocks arrival and in flight transient data, responsible
// to complete missing parts of transient data for given block.
type Coordinator interface {
PrivateDataDistributor
// StoreBlock deliver new block with underlined private data
// returns missing transaction ids
StoreBlock(block *common.Block, data util.PvtDataCollections) error

// StorePvtData used to persist private date into transient store
StorePvtData(txid string, privData *rwset.TxPvtReadWriteSet) error

// GetPvtDataAndBlockByNum get block by number and returns also all related private data
// the order of private data in slice of PvtDataCollections doesn't implies the order of
// transactions in the block related to these private data, to get the correct placement
Expand Down Expand Up @@ -86,11 +81,9 @@ func NewCoordinator(committer committer.Committer, store TransientStore, gossipF
return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher}
}

// Distribute distributes a given private data with a specific transactionID
// to peers according policies that are derived from the given PolicyStore and PolicyParser
func (c *coordinator) Distribute(privateData *rwset.TxPvtReadWriteSet, txID string) error {
// TODO: also need to distribute the data...
return c.TransientStore.Persist(txID, 0, privateData)
// StorePvtData used to persist private date into transient store
func (c *coordinator) StorePvtData(txID string, privData *rwset.TxPvtReadWriteSet) error {
return c.TransientStore.Persist(txID, 0, privData)
}

// StoreBlock stores block with private data into the ledger
Expand Down
126 changes: 126 additions & 0 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package privdata

import (
"time"

"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/filter"
gossip2 "github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/spf13/viper"
)

// gossipAdapter an adapter for API's required from gossip module
type gossipAdapter interface {
// SendByCriteria sends a given message to all peers that match the given SendCriteria
SendByCriteria(message *proto.SignedGossipMessage, criteria gossip2.SendCriteria) error

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria, and that they published their channel participation
PeerFilter(channel gossipCommon.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error)
}

// PvtDataDistributor interface to defines API of distributing private data
type PvtDataDistributor interface {
// Distribute broadcast reliably private data read write set based on policies
Distribute(txID string, privData *rwset.TxPvtReadWriteSet, ps privdata.PolicyStore, pp privdata.PolicyParser) error
}

// distributorImpl the implementation of the private data distributor interface
type distributorImpl struct {
chainID string
minAck int
maxPeers int
gossipAdapter
}

// NewDistributor a constructor for private data distributor capable to send
// private read write sets for underlined collection
func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
return &distributorImpl{
chainID: chainID,
gossipAdapter: gossip,
minAck: viper.GetInt("peer.gossip.pvtData.minAck"),
maxPeers: viper.GetInt("peer.gossip.pvtData.maxPeers"),
}
}

// Distribute broadcast reliably private data read write set based on policies
func (d *distributorImpl) Distribute(txID string, privData *rwset.TxPvtReadWriteSet, ps privdata.PolicyStore, pp privdata.PolicyParser) error {
for _, pvtRwset := range privData.NsPvtRwset {
namespace := pvtRwset.Namespace
for _, collection := range pvtRwset.CollectionPvtRwset {
collectionName := collection.CollectionName
policyFilter := pp.Parse(ps.CollectionPolicy(rwset.CollectionCriteria{
Namespace: namespace,
Collection: collectionName,
TxId: txID,
Channel: d.chainID,
}))

routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature) bool {
return policyFilter(common.SignedData{
Data: signature.Message,
Signature: signature.Signature,
Identity: []byte(signature.PeerIdentity),
})
})

if err != nil {
logger.Error("Failed to retrieve peer routing filter, due to", err, "collection name", collectionName)
return err
}

msg := &proto.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
Tag: proto.GossipMessage_CHAN_ONLY,
Content: &proto.GossipMessage_PrivateData{
PrivateData: &proto.PrivateDataMessage{
Payload: &proto.PrivatePayload{
Namespace: namespace,
CollectionName: collectionName,
TxId: txID,
PrivateRwset: collection.Rwset,
},
},
},
}

pvtDataMsg, err := msg.NoopSign()
if err != nil {
return err
}

err = d.gossipAdapter.SendByCriteria(pvtDataMsg, gossip2.SendCriteria{
Timeout: time.Second,
Channel: gossipCommon.ChainID(d.chainID),
MaxPeers: d.maxPeers,
MinAck: d.minAck,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
})

if err != nil {
logger.Warning("Could not send private data for channel", d.chainID,
"collection name", collectionName, "due to", err)

return err
}
}
}
return nil
}
40 changes: 33 additions & 7 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,19 @@ func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api
})
}

type privateHandler struct {
support Support
coordinator privdata2.Coordinator
distributor privdata2.PvtDataDistributor
}

func (p privateHandler) close() {
p.coordinator.Close()
}

type gossipServiceImpl struct {
gossipSvc
coordinators map[string]privdata2.Coordinator
privateHandlers map[string]privateHandler
chains map[string]state.GossipStateProvider
leaderElection map[string]election.LeaderElectionService
deliveryService map[string]deliverclient.DeliverService
Expand Down Expand Up @@ -146,7 +156,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string
gossipServiceInstance = &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossip,
coordinators: make(map[string]privdata2.Coordinator),
privateHandlers: make(map[string]privateHandler),
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryService: make(map[string]deliverclient.DeliverService),
Expand All @@ -163,14 +173,26 @@ func GetGossipService() GossipService {
return gossipServiceInstance
}

// DistributePrivateData distribute private read write set inside the channel based on the collections policies
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *rwset.TxPvtReadWriteSet) error {
g.lock.RLock()
coord, exists := g.coordinators[chainID]
handler, exists := g.privateHandlers[chainID]
g.lock.RUnlock()
if !exists {
return errors.Errorf("No coordinator for %s", chainID)
return errors.Errorf("No private data handler for %s", chainID)
}
return coord.Distribute(privData, txID)

if err := handler.distributor.Distribute(txID, privData, handler.support.Ps, handler.support.Pp); err != nil {
logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
return err
}

if err := handler.coordinator.StorePvtData(txID, privData); err != nil {
logger.Error("Failed to store private data into transient store, txID",
txID, "channel", chainID, "due to", err)
return err
}
return nil
}

// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
Expand All @@ -194,7 +216,11 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string
servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs}
fetcher := privdata2.NewPuller(support.Ps, support.Pp, g.gossipSvc, NewDataRetriever(support.Store), chainID)
coordinator := privdata2.NewCoordinator(support.Committer, support.Store, fetcher)
g.coordinators[chainID] = coordinator
g.privateHandlers[chainID] = privateHandler{
support: support,
coordinator: coordinator,
distributor: privdata2.NewDistributor(chainID, g),
}
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
if g.deliveryService[chainID] == nil {
var err error
Expand Down Expand Up @@ -286,7 +312,7 @@ func (g *gossipServiceImpl) Stop() {
le.Stop()
}
g.chains[chainID].Stop()
g.coordinators[chainID].Close()
g.privateHandlers[chainID].close()

if g.deliveryService[chainID] != nil {
g.deliveryService[chainID].Stop()
Expand Down
3 changes: 1 addition & 2 deletions gossip/service/gossip_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/msp/mgmt"
Expand Down Expand Up @@ -648,7 +647,7 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos
gossipSvc: gossip,
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
coordinators: make(map[string]privdata.Coordinator),
privateHandlers: make(map[string]privateHandler),
deliveryService: make(map[string]deliverclient.DeliverService),
deliveryFactory: &deliveryFactoryImpl{},
peerIdentity: api.PeerIdentityType(conf.InternalEndpoint),
Expand Down
68 changes: 67 additions & 1 deletion gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
proto "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/op/go-logging"
"github.com/pkg/errors"
"github.com/spf13/viper"
Expand Down Expand Up @@ -95,6 +96,9 @@ type ledgerResources interface {
// returns missing transaction ids
StoreBlock(block *common.Block, data util.PvtDataCollections) error

// StorePvtData used to persist private date into transient store
StorePvtData(txid string, privData *rwset.TxPvtReadWriteSet) error

// GetPvtDataAndBlockByNum get block by number and returns also all related private data
// the order of private data in slice of PvtDataCollections doesn't implies the order of
// transactions in the block related to these private data, to get the correct placement
Expand Down Expand Up @@ -122,7 +126,6 @@ type ServicesMediator struct {
// the struct to handle in memory sliding window of
// new ledger block to be acquired by hyper ledger
type GossipStateProviderImpl struct {

// Chain id
chainID string

Expand All @@ -133,6 +136,8 @@ type GossipStateProviderImpl struct {

commChan <-chan proto.ReceivedMessage

pvtDataChan <-chan proto.ReceivedMessage

// Queue of payloads which wasn't acquired yet
payloads PayloadsBuffer

Expand Down Expand Up @@ -187,6 +192,23 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
// Filter message which are only relevant for nodeMetastate transfer
_, commChan := services.Accept(remoteStateMsgFilter, true)

// Filter private data messages
_, pvtDataChan := services.Accept(func(message interface{}) bool {
receivedMsg := message.(proto.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
if msg.GetPrivateData() == nil {
return false
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {
logger.Warning("Got unauthorized private data message from", string(connInfo.Identity))
return false
}
return true

}, true)

height, err := ledger.LedgerHeight()
if height == 0 {
// Panic here since this is an indication of invalid situation which should not happen in normal
Expand Down Expand Up @@ -214,6 +236,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
// Channel to read direct messages from other peers
commChan: commChan,

// Channel for private data messages
pvtDataChan: pvtDataChan,

// Create a queue for payload received
payloads: NewPayloadsBuffer(height),

Expand Down Expand Up @@ -268,13 +293,54 @@ func (s *GossipStateProviderImpl) listen() {
case msg := <-s.commChan:
logger.Debug("Direct message ", msg)
go s.directMessage(msg)
case msg := <-s.pvtDataChan:
logger.Debug("Private data message ", msg)
go s.privateDataMessage(msg)
case <-s.stopCh:
s.stopCh <- struct{}{}
logger.Debug("Stop listening for new messages")
return
}
}
}
func (s *GossipStateProviderImpl) privateDataMessage(msg proto.ReceivedMessage) {
if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
logger.Warning("Received state transfer request for channel",
string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...")
return
}

gossipMsg := msg.GetGossipMessage()
pvtDataMsg := gossipMsg.GetPrivateData()

collectionName := pvtDataMsg.Payload.CollectionName
txID := pvtDataMsg.Payload.TxId
pvtRwSet := pvtDataMsg.Payload.PrivateRwset

if len(pvtRwSet) == 0 {
logger.Warning("Malformed private data message, no rwset provided, collection name = ", collectionName)
return
}

txPvtRwSet := &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{{
Namespace: pvtDataMsg.Payload.Namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: pvtRwSet,
}}},
},
}

if err := s.ledger.StorePvtData(txID, txPvtRwSet); err != nil {
logger.Errorf("Wasn't able to persist private data for collection %s, due to %s", collectionName, err)
msg.Ack(err) // Sending NACK to indicate failure of storing collection
}

msg.Ack(nil)
logger.Debug("Private data for collection", collectionName, "has been stored")
}

func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
logger.Debug("[ENTER] -> directMessage")
Expand Down
Loading

0 comments on commit 21a97ba

Please sign in to comment.