@@ -43,9 +43,7 @@ type PvtDataDistributor interface {
4343
4444// distributorImpl the implementation of the private data distributor interface
4545type distributorImpl struct {
46- chainID string
47- minAck int
48- maxPeers int
46+ chainID string
4947 gossipAdapter
5048}
5149
@@ -55,8 +53,6 @@ func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
5553 return & distributorImpl {
5654 chainID : chainID ,
5755 gossipAdapter : gossip ,
58- minAck : viper .GetInt ("peer.gossip.pvtData.minAck" ),
59- maxPeers : viper .GetInt ("peer.gossip.pvtData.maxPeers" ),
6056 }
6157}
6258
@@ -98,54 +94,64 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.
9894 return nil , errors .Errorf ("No collection access policy filter computed for %v" , cc )
9995 }
10096
101- routingFilter , err := d .gossipAdapter .PeerFilter (gossipCommon .ChainID (d .chainID ), func (signature api.PeerSignature ) bool {
102- return colFilter (common.SignedData {
103- Data : signature .Message ,
104- Signature : signature .Signature ,
105- Identity : []byte (signature .PeerIdentity ),
106- })
107- })
108-
97+ pvtDataMsg , err := d .createPrivateDataMessage (txID , namespace , collection .CollectionName , collection .Rwset )
10998 if err != nil {
110- logger .Error ("Failed to retrieve peer routing filter for" , cc , " due to" , err )
111- return nil , err
99+ return nil , errors .WithStack (err )
112100 }
113101
114- msg := & proto.GossipMessage {
115- Channel : []byte (d .chainID ),
116- Nonce : util .RandomUInt64 (),
117- Tag : proto .GossipMessage_CHAN_ONLY ,
118- Content : & proto.GossipMessage_PrivateData {
119- PrivateData : & proto.PrivateDataMessage {
120- Payload : & proto.PrivatePayload {
121- Namespace : namespace ,
122- CollectionName : collectionName ,
123- TxId : txID ,
124- PrivateRwset : collection .Rwset ,
125- },
126- },
127- },
128- }
129-
130- pvtDataMsg , err := msg .NoopSign ()
102+ dPlan , err := d .disseminationPlanForMsg (colAP , colFilter , pvtDataMsg )
131103 if err != nil {
132- return nil , err
104+ return nil , errors . WithStack ( err )
133105 }
106+ disseminationPlan = append (disseminationPlan , dPlan ... )
107+ }
108+ }
109+ return disseminationPlan , nil
110+ }
134111
135- sc := gossip2.SendCriteria {
136- Timeout : time .Second ,
137- Channel : gossipCommon .ChainID (d .chainID ),
138- MaxPeers : d .maxPeers ,
139- MinAck : d .minAck ,
140- IsEligible : func (member discovery.NetworkMember ) bool {
141- return routingFilter (member )
142- },
112+ func (d * distributorImpl ) disseminationPlanForMsg (colAP privdata.CollectionAccessPolicy , colFilter privdata.Filter , pvtDataMsg * proto.SignedGossipMessage ) ([]* dissemination , error ) {
113+ var disseminationPlan []* dissemination
114+ for _ , expectedToBeInOurOrg := range []bool {true , false } {
115+ var expectedToBeInOurOrg bool = expectedToBeInOurOrg
116+ minAck := colAP .RequiredExternalPeerCount ()
117+ if expectedToBeInOurOrg {
118+ minAck = colAP .RequiredInternalPeerCount ()
119+ }
120+
121+ routingFilter , err := d .gossipAdapter .PeerFilter (gossipCommon .ChainID (d .chainID ), func (signature api.PeerSignature , isOurOrg bool ) bool {
122+ if isOurOrg != expectedToBeInOurOrg {
123+ return false
143124 }
144- disseminationPlan = append (disseminationPlan , & dissemination {
145- criteria : sc ,
146- msg : pvtDataMsg ,
125+ return colFilter (common.SignedData {
126+ Data : signature .Message ,
127+ Signature : signature .Signature ,
128+ Identity : []byte (signature .PeerIdentity ),
147129 })
130+ })
131+
132+ if err != nil {
133+ logger .Error ("Failed to retrieve peer routing filter for channel" , d .chainID , ":" , err )
134+ return nil , err
135+ }
136+
137+ maxPeers := viper .GetInt ("peer.gossip.pvtData.maxExternalPeers" )
138+ if expectedToBeInOurOrg {
139+ maxPeers = viper .GetInt ("peer.gossip.pvtData.maxInternalPeers" )
148140 }
141+
142+ sc := gossip2.SendCriteria {
143+ Timeout : time .Second ,
144+ Channel : gossipCommon .ChainID (d .chainID ),
145+ MaxPeers : maxPeers ,
146+ MinAck : minAck ,
147+ IsEligible : func (member discovery.NetworkMember ) bool {
148+ return routingFilter (member )
149+ },
150+ }
151+ disseminationPlan = append (disseminationPlan , & dissemination {
152+ criteria : sc ,
153+ msg : pvtDataMsg ,
154+ })
149155 }
150156 return disseminationPlan , nil
151157}
@@ -172,3 +178,27 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error
172178 }
173179 return nil
174180}
181+
182+ func (d * distributorImpl ) createPrivateDataMessage (txID , namespace , collectionName string , rwset []byte ) (* proto.SignedGossipMessage , error ) {
183+ msg := & proto.GossipMessage {
184+ Channel : []byte (d .chainID ),
185+ Nonce : util .RandomUInt64 (),
186+ Tag : proto .GossipMessage_CHAN_ONLY ,
187+ Content : & proto.GossipMessage_PrivateData {
188+ PrivateData : & proto.PrivateDataMessage {
189+ Payload : & proto.PrivatePayload {
190+ Namespace : namespace ,
191+ CollectionName : collectionName ,
192+ TxId : txID ,
193+ PrivateRwset : rwset ,
194+ },
195+ },
196+ },
197+ }
198+
199+ pvtDataMsg , err := msg .NoopSign ()
200+ if err != nil {
201+ return nil , err
202+ }
203+ return pvtDataMsg , nil
204+ }
0 commit comments