Skip to content

Commit 21a97ba

Browse files
committed
[FAB-5084] Push private data upon endorsement
This change set makes the peer push private data to a few peers upon endorsement. Change-Id: Ib165c9469a2f601da1298c46cfe5d2cd5f8cdccc Signed-off-by: yacovm <yacovm@il.ibm.com> Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent fe5b068 commit 21a97ba

File tree

11 files changed

+466
-159
lines changed

11 files changed

+466
-159
lines changed

gossip/privdata/coordinator.go

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,17 @@ type TransientStore interface {
3939
GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (transientstore.RWSetScanner, error)
4040
}
4141

42-
// PrivateDataDistributor distributes private data to peers
43-
type PrivateDataDistributor interface {
44-
// Distribute distributes a given private data with a specific transactionID
45-
// to peers according policies that are derived from the given PolicyStore and PolicyParser
46-
Distribute(privateData *rwset.TxPvtReadWriteSet, txID string) error
47-
}
48-
4942
// Coordinator orchestrates the flow of the new
5043
// blocks arrival and in flight transient data, responsible
5144
// to complete missing parts of transient data for given block.
5245
type Coordinator interface {
53-
PrivateDataDistributor
5446
// StoreBlock deliver new block with underlined private data
5547
// returns missing transaction ids
5648
StoreBlock(block *common.Block, data util.PvtDataCollections) error
5749

50+
// StorePvtData used to persist private date into transient store
51+
StorePvtData(txid string, privData *rwset.TxPvtReadWriteSet) error
52+
5853
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
5954
// the order of private data in slice of PvtDataCollections doesn't implies the order of
6055
// transactions in the block related to these private data, to get the correct placement
@@ -86,11 +81,9 @@ func NewCoordinator(committer committer.Committer, store TransientStore, gossipF
8681
return &coordinator{Committer: committer, TransientStore: store, gossipFetcher: gossipFetcher}
8782
}
8883

89-
// Distribute distributes a given private data with a specific transactionID
90-
// to peers according policies that are derived from the given PolicyStore and PolicyParser
91-
func (c *coordinator) Distribute(privateData *rwset.TxPvtReadWriteSet, txID string) error {
92-
// TODO: also need to distribute the data...
93-
return c.TransientStore.Persist(txID, 0, privateData)
84+
// StorePvtData used to persist private date into transient store
85+
func (c *coordinator) StorePvtData(txID string, privData *rwset.TxPvtReadWriteSet) error {
86+
return c.TransientStore.Persist(txID, 0, privData)
9487
}
9588

9689
// StoreBlock stores block with private data into the ledger

gossip/privdata/distributor.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package privdata
8+
9+
import (
10+
"time"
11+
12+
"github.com/hyperledger/fabric/core/common/privdata"
13+
"github.com/hyperledger/fabric/gossip/api"
14+
gossipCommon "github.com/hyperledger/fabric/gossip/common"
15+
"github.com/hyperledger/fabric/gossip/discovery"
16+
"github.com/hyperledger/fabric/gossip/filter"
17+
gossip2 "github.com/hyperledger/fabric/gossip/gossip"
18+
"github.com/hyperledger/fabric/gossip/util"
19+
"github.com/hyperledger/fabric/protos/common"
20+
proto "github.com/hyperledger/fabric/protos/gossip"
21+
"github.com/hyperledger/fabric/protos/ledger/rwset"
22+
"github.com/spf13/viper"
23+
)
24+
25+
// gossipAdapter an adapter for API's required from gossip module
26+
type gossipAdapter interface {
27+
// SendByCriteria sends a given message to all peers that match the given SendCriteria
28+
SendByCriteria(message *proto.SignedGossipMessage, criteria gossip2.SendCriteria) error
29+
30+
// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
31+
// only peer identities that match the given criteria, and that they published their channel participation
32+
PeerFilter(channel gossipCommon.ChainID, messagePredicate api.SubChannelSelectionCriteria) (filter.RoutingFilter, error)
33+
}
34+
35+
// PvtDataDistributor interface to defines API of distributing private data
36+
type PvtDataDistributor interface {
37+
// Distribute broadcast reliably private data read write set based on policies
38+
Distribute(txID string, privData *rwset.TxPvtReadWriteSet, ps privdata.PolicyStore, pp privdata.PolicyParser) error
39+
}
40+
41+
// distributorImpl the implementation of the private data distributor interface
42+
type distributorImpl struct {
43+
chainID string
44+
minAck int
45+
maxPeers int
46+
gossipAdapter
47+
}
48+
49+
// NewDistributor a constructor for private data distributor capable to send
50+
// private read write sets for underlined collection
51+
func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
52+
return &distributorImpl{
53+
chainID: chainID,
54+
gossipAdapter: gossip,
55+
minAck: viper.GetInt("peer.gossip.pvtData.minAck"),
56+
maxPeers: viper.GetInt("peer.gossip.pvtData.maxPeers"),
57+
}
58+
}
59+
60+
// Distribute broadcast reliably private data read write set based on policies
61+
func (d *distributorImpl) Distribute(txID string, privData *rwset.TxPvtReadWriteSet, ps privdata.PolicyStore, pp privdata.PolicyParser) error {
62+
for _, pvtRwset := range privData.NsPvtRwset {
63+
namespace := pvtRwset.Namespace
64+
for _, collection := range pvtRwset.CollectionPvtRwset {
65+
collectionName := collection.CollectionName
66+
policyFilter := pp.Parse(ps.CollectionPolicy(rwset.CollectionCriteria{
67+
Namespace: namespace,
68+
Collection: collectionName,
69+
TxId: txID,
70+
Channel: d.chainID,
71+
}))
72+
73+
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChainID(d.chainID), func(signature api.PeerSignature) bool {
74+
return policyFilter(common.SignedData{
75+
Data: signature.Message,
76+
Signature: signature.Signature,
77+
Identity: []byte(signature.PeerIdentity),
78+
})
79+
})
80+
81+
if err != nil {
82+
logger.Error("Failed to retrieve peer routing filter, due to", err, "collection name", collectionName)
83+
return err
84+
}
85+
86+
msg := &proto.GossipMessage{
87+
Channel: []byte(d.chainID),
88+
Nonce: util.RandomUInt64(),
89+
Tag: proto.GossipMessage_CHAN_ONLY,
90+
Content: &proto.GossipMessage_PrivateData{
91+
PrivateData: &proto.PrivateDataMessage{
92+
Payload: &proto.PrivatePayload{
93+
Namespace: namespace,
94+
CollectionName: collectionName,
95+
TxId: txID,
96+
PrivateRwset: collection.Rwset,
97+
},
98+
},
99+
},
100+
}
101+
102+
pvtDataMsg, err := msg.NoopSign()
103+
if err != nil {
104+
return err
105+
}
106+
107+
err = d.gossipAdapter.SendByCriteria(pvtDataMsg, gossip2.SendCriteria{
108+
Timeout: time.Second,
109+
Channel: gossipCommon.ChainID(d.chainID),
110+
MaxPeers: d.maxPeers,
111+
MinAck: d.minAck,
112+
IsEligible: func(member discovery.NetworkMember) bool {
113+
return routingFilter(member)
114+
},
115+
})
116+
117+
if err != nil {
118+
logger.Warning("Could not send private data for channel", d.chainID,
119+
"collection name", collectionName, "due to", err)
120+
121+
return err
122+
}
123+
}
124+
}
125+
return nil
126+
}

gossip/service/gossip_service.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,19 @@ func (*deliveryFactoryImpl) Service(g GossipService, endpoints []string, mcs api
7575
})
7676
}
7777

78+
type privateHandler struct {
79+
support Support
80+
coordinator privdata2.Coordinator
81+
distributor privdata2.PvtDataDistributor
82+
}
83+
84+
func (p privateHandler) close() {
85+
p.coordinator.Close()
86+
}
87+
7888
type gossipServiceImpl struct {
7989
gossipSvc
80-
coordinators map[string]privdata2.Coordinator
90+
privateHandlers map[string]privateHandler
8191
chains map[string]state.GossipStateProvider
8292
leaderElection map[string]election.LeaderElectionService
8393
deliveryService map[string]deliverclient.DeliverService
@@ -146,7 +156,7 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string
146156
gossipServiceInstance = &gossipServiceImpl{
147157
mcs: mcs,
148158
gossipSvc: gossip,
149-
coordinators: make(map[string]privdata2.Coordinator),
159+
privateHandlers: make(map[string]privateHandler),
150160
chains: make(map[string]state.GossipStateProvider),
151161
leaderElection: make(map[string]election.LeaderElectionService),
152162
deliveryService: make(map[string]deliverclient.DeliverService),
@@ -163,14 +173,26 @@ func GetGossipService() GossipService {
163173
return gossipServiceInstance
164174
}
165175

176+
// DistributePrivateData distribute private read write set inside the channel based on the collections policies
166177
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *rwset.TxPvtReadWriteSet) error {
167178
g.lock.RLock()
168-
coord, exists := g.coordinators[chainID]
179+
handler, exists := g.privateHandlers[chainID]
169180
g.lock.RUnlock()
170181
if !exists {
171-
return errors.Errorf("No coordinator for %s", chainID)
182+
return errors.Errorf("No private data handler for %s", chainID)
172183
}
173-
return coord.Distribute(privData, txID)
184+
185+
if err := handler.distributor.Distribute(txID, privData, handler.support.Ps, handler.support.Pp); err != nil {
186+
logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
187+
return err
188+
}
189+
190+
if err := handler.coordinator.StorePvtData(txID, privData); err != nil {
191+
logger.Error("Failed to store private data into transient store, txID",
192+
txID, "channel", chainID, "due to", err)
193+
return err
194+
}
195+
return nil
174196
}
175197

176198
// NewConfigEventer creates a ConfigProcessor which the configtx.Manager can ultimately route config updates to
@@ -194,7 +216,11 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, endpoints []string
194216
servicesAdapter := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs}
195217
fetcher := privdata2.NewPuller(support.Ps, support.Pp, g.gossipSvc, NewDataRetriever(support.Store), chainID)
196218
coordinator := privdata2.NewCoordinator(support.Committer, support.Store, fetcher)
197-
g.coordinators[chainID] = coordinator
219+
g.privateHandlers[chainID] = privateHandler{
220+
support: support,
221+
coordinator: coordinator,
222+
distributor: privdata2.NewDistributor(chainID, g),
223+
}
198224
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator)
199225
if g.deliveryService[chainID] == nil {
200226
var err error
@@ -286,7 +312,7 @@ func (g *gossipServiceImpl) Stop() {
286312
le.Stop()
287313
}
288314
g.chains[chainID].Stop()
289-
g.coordinators[chainID].Close()
315+
g.privateHandlers[chainID].close()
290316

291317
if g.deliveryService[chainID] != nil {
292318
g.deliveryService[chainID].Stop()

gossip/service/gossip_service_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
gossipCommon "github.com/hyperledger/fabric/gossip/common"
2727
"github.com/hyperledger/fabric/gossip/election"
2828
"github.com/hyperledger/fabric/gossip/gossip"
29-
"github.com/hyperledger/fabric/gossip/privdata"
3029
"github.com/hyperledger/fabric/gossip/state"
3130
"github.com/hyperledger/fabric/gossip/util"
3231
"github.com/hyperledger/fabric/msp/mgmt"
@@ -648,7 +647,7 @@ func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gos
648647
gossipSvc: gossip,
649648
chains: make(map[string]state.GossipStateProvider),
650649
leaderElection: make(map[string]election.LeaderElectionService),
651-
coordinators: make(map[string]privdata.Coordinator),
650+
privateHandlers: make(map[string]privateHandler),
652651
deliveryService: make(map[string]deliverclient.DeliverService),
653652
deliveryFactory: &deliveryFactoryImpl{},
654653
peerIdentity: api.PeerIdentityType(conf.InternalEndpoint),

gossip/state/state.go

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/hyperledger/fabric/gossip/util"
2121
"github.com/hyperledger/fabric/protos/common"
2222
proto "github.com/hyperledger/fabric/protos/gossip"
23+
"github.com/hyperledger/fabric/protos/ledger/rwset"
2324
"github.com/op/go-logging"
2425
"github.com/pkg/errors"
2526
"github.com/spf13/viper"
@@ -95,6 +96,9 @@ type ledgerResources interface {
9596
// returns missing transaction ids
9697
StoreBlock(block *common.Block, data util.PvtDataCollections) error
9798

99+
// StorePvtData used to persist private date into transient store
100+
StorePvtData(txid string, privData *rwset.TxPvtReadWriteSet) error
101+
98102
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
99103
// the order of private data in slice of PvtDataCollections doesn't implies the order of
100104
// transactions in the block related to these private data, to get the correct placement
@@ -122,7 +126,6 @@ type ServicesMediator struct {
122126
// the struct to handle in memory sliding window of
123127
// new ledger block to be acquired by hyper ledger
124128
type GossipStateProviderImpl struct {
125-
126129
// Chain id
127130
chainID string
128131

@@ -133,6 +136,8 @@ type GossipStateProviderImpl struct {
133136

134137
commChan <-chan proto.ReceivedMessage
135138

139+
pvtDataChan <-chan proto.ReceivedMessage
140+
136141
// Queue of payloads which wasn't acquired yet
137142
payloads PayloadsBuffer
138143

@@ -187,6 +192,23 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
187192
// Filter message which are only relevant for nodeMetastate transfer
188193
_, commChan := services.Accept(remoteStateMsgFilter, true)
189194

195+
// Filter private data messages
196+
_, pvtDataChan := services.Accept(func(message interface{}) bool {
197+
receivedMsg := message.(proto.ReceivedMessage)
198+
msg := receivedMsg.GetGossipMessage()
199+
if msg.GetPrivateData() == nil {
200+
return false
201+
}
202+
connInfo := receivedMsg.GetConnectionInfo()
203+
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
204+
if authErr != nil {
205+
logger.Warning("Got unauthorized private data message from", string(connInfo.Identity))
206+
return false
207+
}
208+
return true
209+
210+
}, true)
211+
190212
height, err := ledger.LedgerHeight()
191213
if height == 0 {
192214
// Panic here since this is an indication of invalid situation which should not happen in normal
@@ -214,6 +236,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
214236
// Channel to read direct messages from other peers
215237
commChan: commChan,
216238

239+
// Channel for private data messages
240+
pvtDataChan: pvtDataChan,
241+
217242
// Create a queue for payload received
218243
payloads: NewPayloadsBuffer(height),
219244

@@ -268,13 +293,54 @@ func (s *GossipStateProviderImpl) listen() {
268293
case msg := <-s.commChan:
269294
logger.Debug("Direct message ", msg)
270295
go s.directMessage(msg)
296+
case msg := <-s.pvtDataChan:
297+
logger.Debug("Private data message ", msg)
298+
go s.privateDataMessage(msg)
271299
case <-s.stopCh:
272300
s.stopCh <- struct{}{}
273301
logger.Debug("Stop listening for new messages")
274302
return
275303
}
276304
}
277305
}
306+
func (s *GossipStateProviderImpl) privateDataMessage(msg proto.ReceivedMessage) {
307+
if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
308+
logger.Warning("Received state transfer request for channel",
309+
string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...")
310+
return
311+
}
312+
313+
gossipMsg := msg.GetGossipMessage()
314+
pvtDataMsg := gossipMsg.GetPrivateData()
315+
316+
collectionName := pvtDataMsg.Payload.CollectionName
317+
txID := pvtDataMsg.Payload.TxId
318+
pvtRwSet := pvtDataMsg.Payload.PrivateRwset
319+
320+
if len(pvtRwSet) == 0 {
321+
logger.Warning("Malformed private data message, no rwset provided, collection name = ", collectionName)
322+
return
323+
}
324+
325+
txPvtRwSet := &rwset.TxPvtReadWriteSet{
326+
DataModel: rwset.TxReadWriteSet_KV,
327+
NsPvtRwset: []*rwset.NsPvtReadWriteSet{{
328+
Namespace: pvtDataMsg.Payload.Namespace,
329+
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
330+
CollectionName: collectionName,
331+
Rwset: pvtRwSet,
332+
}}},
333+
},
334+
}
335+
336+
if err := s.ledger.StorePvtData(txID, txPvtRwSet); err != nil {
337+
logger.Errorf("Wasn't able to persist private data for collection %s, due to %s", collectionName, err)
338+
msg.Ack(err) // Sending NACK to indicate failure of storing collection
339+
}
340+
341+
msg.Ack(nil)
342+
logger.Debug("Private data for collection", collectionName, "has been stored")
343+
}
278344

279345
func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
280346
logger.Debug("[ENTER] -> directMessage")

0 commit comments

Comments
 (0)