Skip to content

Commit dab82c9

Browse files
committed
[FAB-7049] Expose deliver service on peer
This CR exposes the deliver service on a peer, which will allow clients to retrieve blocks using the same mechanism that is available on the orderer. Change-Id: Iafcbcf32c7a448680a6726ae4cca5fb94832e68b Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent c39d69b commit dab82c9

File tree

8 files changed

+214
-77
lines changed

8 files changed

+214
-77
lines changed

bddtests/features/bootstrap.feature

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ Feature: Bootstrap
9696
| peerOrg1 |
9797
| peerOrg2 |
9898

99-
And user "configAdminOrdererOrg0" using cert alias "config-admin-cert" connects to deliver function on orderer "<orderer0>"
99+
And user "configAdminOrdererOrg0" using cert alias "config-admin-cert" connects to deliver function on orderer "<orderer0>" using port "7050"
100100

101101
And user "configAdminOrdererOrg0" retrieves the latest config update "latestOrdererConfig" from orderer "<orderer0>" for channel "{ordererSystemChannelId}"
102102

@@ -156,7 +156,7 @@ Feature: Bootstrap
156156
# Requesting a deliver earlier may result in a SERVICE_UNAVAILABLE response and a connection drop
157157
And I wait "<ChannelJoinDelay>" seconds
158158

159-
When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "<orderer0>"
159+
When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "<orderer0>" using port "7050"
160160
And user "dev0Org0" sends deliver a seek request on orderer "<orderer0>" with properties:
161161
| ChainId | Start | End |
162162
| com.acme.blockchain.jdoe.channel1 | 0 | 0 |
@@ -190,7 +190,7 @@ Feature: Bootstrap
190190
| User | Peer | Organization |
191191
| peer0Signer | peer0 | peerOrg0 |
192192

193-
And user "configAdminPeerOrg0" using cert alias "config-admin-cert" connects to deliver function on orderer "<orderer0>"
193+
And user "configAdminPeerOrg0" using cert alias "config-admin-cert" connects to deliver function on orderer "<orderer0>" using port "7050"
194194

195195
And user "configAdminPeerOrg0" retrieves the latest config update "latestChannelConfigUpdate" from orderer "<orderer0>" for channel "com.acme.blockchain.jdoe.channel1"
196196

@@ -249,6 +249,19 @@ Feature: Bootstrap
249249
| User | Peer | Organization |
250250
| peer2Signer | peer2 | peerOrg1 |
251251

252+
When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "peer0" using port "7051"
253+
And user "dev0Org0" sends deliver a seek request on orderer "peer0" with properties:
254+
| ChainId | Start | End |
255+
| com.acme.blockchain.jdoe.channel1 | 0 | 0 |
256+
257+
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "1" blocks with "1" messages within "1" seconds
258+
259+
When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "peer2" using port "7051"
260+
And user "dev0Org0" sends deliver a seek request on orderer "peer2" with properties:
261+
| ChainId | Start | End |
262+
| com.acme.blockchain.jdoe.channel1 | 0 | 0 |
263+
264+
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "1" blocks with "1" messages within "1" seconds
252265

253266
# Entry point for invoking on an existing channel
254267
When user "peer0Admin" creates a chaincode spec "ccSpec" with name "example02" of type "GOLANG" for chaincode "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" with args
@@ -378,9 +391,6 @@ Feature: Bootstrap
378391

379392
Then user "dev0Org0" should get a delivery "deliveredInvokeTx1Block" from "<orderer0>" of "1" blocks with "1" messages within "1" seconds
380393

381-
382-
# TODO: Once events are working, consider listen event listener as well.
383-
384394
Examples: Orderer Options
385395
| ComposeFile | SystemUpWaitTime | ConsensusType | ChannelJoinDelay | BroadcastWaitTime | orderer0 | orderer1 | orderer2 |Orderer Specific Info|
386396
| dc-base.yml | 0 | solo | 2 | 2 | orderer0 | orderer0 | orderer0 | |

bddtests/steps/bootstrap_impl.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,12 +268,12 @@ def step_impl(context, userName, transactionAlias, orderer):
268268
bootstrap_util.broadcastCreateChannelConfigTx(context=context, certAlias=None, composeService=orderer, user=user, configTxEnvelope=transaction)
269269

270270

271-
@when(u'user "{userName}" using cert alias "{certAlias}" connects to deliver function on orderer "{composeService}"')
272-
@Given(u'user "{userName}" using cert alias "{certAlias}" connects to deliver function on orderer "{composeService}"')
273-
def step_impl(context, userName, certAlias, composeService):
271+
@when(u'user "{userName}" using cert alias "{certAlias}" connects to deliver function on orderer "{composeService}" using port "{port}"')
272+
@Given(u'user "{userName}" using cert alias "{certAlias}" connects to deliver function on orderer "{composeService}" using port "{port}"')
273+
def step_impl(context, userName, certAlias, composeService, port):
274274
directory = bootstrap_util.getDirectory(context)
275275
user = directory.getUser(userName=userName)
276-
user.connectToDeliverFunction(context, composeService, nodeAdminTuple=user.tags[certAlias])
276+
user.connectToDeliverFunction(context, composeService, nodeAdminTuple=user.tags[certAlias], port=port)
277277

278278
@when(u'user "{userName}" sends deliver a seek request on orderer "{composeService}" with properties')
279279
def step_impl(context, userName, composeService):
@@ -390,4 +390,4 @@ def step_impl(context, certAlias):
390390
def step_impl(context, command, service_name):
391391
assert "composition" in context, "No composition found in context"
392392
composition = context.composition
393-
composition.issueCommand([command], [service_name])
393+
composition.issueCommand([command], [service_name])

bddtests/steps/orderer_util.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,13 @@ def closeStreams(self):
190190
deliverStreamHelper.send(None)
191191
self.abDeliversStreamHelperDict.clear()
192192

193-
def connectToDeliverFunction(self, context, composeService, nodeAdminTuple, timeout=1):
193+
def connectToDeliverFunction(self, context, composeService, nodeAdminTuple, timeout=1, port=7050):
194194
'Connect to the deliver function and drain messages to associated orderer queue'
195195
assert not composeService in self.abDeliversStreamHelperDict, "Already connected to deliver stream on {0}".format(composeService)
196196
streamHelper = DeliverStreamHelper(directory=self.directory,
197197
ordererStub=self.getABStubForComposeService(context=context,
198-
composeService=composeService),
198+
composeService=composeService,
199+
port=port),
199200
entity=self, nodeAdminTuple=nodeAdminTuple)
200201
self.abDeliversStreamHelperDict[composeService] = streamHelper
201202
return streamHelper
@@ -220,15 +221,16 @@ def broadcastMessages(self, context, numMsgsToBroadcast, composeService, dataFun
220221
print("Done")
221222
assert counter == int(numMsgsToBroadcast), "counter = {0}, expected {1}".format(counter, numMsgsToBroadcast)
222223

223-
def getABStubForComposeService(self, context, composeService):
224+
def getABStubForComposeService(self, context, composeService,port=7050):
224225
'Return a Stub for the supplied composeService, will cache'
225226
if composeService in self.atomicBroadcastStubsDict:
226227
return self.atomicBroadcastStubsDict[composeService]
227228
# Get the IP address of the server that the user registered on
228229
root_certificates = self.directory.getTrustedRootsForOrdererNetworkAsPEM()
229-
ipAddress, port = bdd_test_util.getPortHostMapping(context.compose_containers, composeService, 7050)
230+
peer_root_certificates = self.directory.getTrustedRootsForPeerNetworkAsPEM()
231+
ipAddress, port_to_use = bdd_test_util.getPortHostMapping(context.compose_containers, composeService, port)
230232
# print("ipAddress in getABStubForComposeService == {0}:{1}".format(ipAddress, port))
231-
channel = bdd_grpc_util.getGRPCChannel(ipAddress=ipAddress, port=port, root_certificates=root_certificates, ssl_target_name_override=composeService)
233+
channel = bdd_grpc_util.getGRPCChannel(ipAddress=ipAddress, port=port_to_use, root_certificates="".join([root_certificates,peer_root_certificates]), ssl_target_name_override=composeService)
232234
newABStub = ab_pb2_grpc.AtomicBroadcastStub(channel)
233235
self.atomicBroadcastStubsDict[composeService] = newABStub
234236
return newABStub

core/peer/atomicbroadcast.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
Copyright IBM Corp. 2017 All Rights Reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package peer
17+
18+
import (
19+
"runtime/debug"
20+
21+
"github.com/hyperledger/fabric/common/deliver"
22+
"github.com/hyperledger/fabric/common/flogging"
23+
"github.com/hyperledger/fabric/common/policies"
24+
"github.com/hyperledger/fabric/orderer/common/broadcast"
25+
"github.com/hyperledger/fabric/protos/common"
26+
ab "github.com/hyperledger/fabric/protos/orderer"
27+
logging "github.com/op/go-logging"
28+
)
29+
30+
const pkgLogID = "common/peer"
31+
32+
var logger *logging.Logger
33+
34+
func init() {
35+
logger = flogging.MustGetLogger(pkgLogID)
36+
}
37+
38+
type server struct {
39+
bh broadcast.Handler
40+
dh deliver.Handler
41+
}
42+
43+
// Broadcast is not implemented/supported on a peer
44+
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
45+
return srv.Send(&ab.BroadcastResponse{
46+
Status: common.Status_NOT_IMPLEMENTED,
47+
})
48+
}
49+
50+
// Deliver sends a stream of blocks to a client after commitment
51+
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
52+
logger.Debugf("Starting new Deliver handler")
53+
defer func() {
54+
if r := recover(); r != nil {
55+
logger.Criticalf("Deliver client triggered panic: %s\n%s", r, debug.Stack())
56+
}
57+
logger.Debugf("Closing Deliver stream")
58+
}()
59+
return s.dh.Handle(srv)
60+
}
61+
62+
// NewAtomicBroadcastServer creates an ab.AtomicBroadcastServer based on the
63+
// ledger Reader. Broadcast is not implemented/supported on the peer.
64+
func NewAtomicBroadcastServer() ab.AtomicBroadcastServer {
65+
s := &server{
66+
dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policies.ChannelReaders),
67+
bh: broadcast.NewHandlerImpl(nil),
68+
}
69+
return s
70+
}

core/peer/peer.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ import (
1515
"github.com/hyperledger/fabric/common/channelconfig"
1616
"github.com/hyperledger/fabric/common/configtx"
1717
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
18+
"github.com/hyperledger/fabric/common/deliver"
1819
"github.com/hyperledger/fabric/common/flogging"
20+
commonledger "github.com/hyperledger/fabric/common/ledger"
21+
"github.com/hyperledger/fabric/common/ledger/blockledger"
22+
"github.com/hyperledger/fabric/common/ledger/blockledger/file"
1923
mockchannelconfig "github.com/hyperledger/fabric/common/mocks/config"
2024
mockconfigtx "github.com/hyperledger/fabric/common/mocks/configtx"
2125
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
@@ -58,7 +62,8 @@ type chainSupport struct {
5862
bundleSource *resourcesconfig.BundleSource
5963
channelconfig.Resources
6064
channelconfig.Application
61-
ledger ledger.PeerLedger
65+
ledger ledger.PeerLedger
66+
fileLedger *fileledger.FileLedger
6267
}
6368

6469
var transientStoreFactory = &storeProvider{}
@@ -132,6 +137,17 @@ func (cs *chainSupport) GetMSPIDs(cid string) []string {
132137
return GetMSPIDs(cid)
133138
}
134139

140+
// Sequence passes through to the underlying configtx.Validator
141+
func (cs *chainSupport) Sequence() uint64 {
142+
return cs.ConfigtxValidator().Sequence()
143+
}
144+
func (cs *chainSupport) Reader() blockledger.Reader {
145+
return cs.fileLedger
146+
}
147+
func (cs *chainSupport) Errored() <-chan struct{} {
148+
return nil
149+
}
150+
135151
// chain is a local struct to manage objects in a chain
136152
type chain struct {
137153
cs *chainSupport
@@ -300,6 +316,7 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
300316
cs := &chainSupport{
301317
Application: ac, // TODO, refactor as this is accessible through Manager
302318
ledger: ledger,
319+
fileLedger: fileledger.NewFileLedger(fileLedgerBlockStore{ledger}),
303320
}
304321

305322
peerSingletonCallback := func(bundle *resourcesconfig.Bundle) {
@@ -684,3 +701,30 @@ func CreatePeerServer(listenAddress string,
684701
func GetPeerServer() comm.GRPCServer {
685702
return peerServer
686703
}
704+
705+
//
706+
// Deliver service support structs for the peer
707+
//
708+
709+
// DeliverSupportManager provides access to a channel for performing deliver
710+
type DeliverSupportManager struct {
711+
}
712+
713+
func (dsm DeliverSupportManager) GetChain(chainID string) (deliver.Support, bool) {
714+
channel, ok := chains.list[chainID]
715+
return channel.cs, ok
716+
}
717+
718+
// fileLedgerBlockStore implements the interface expected by
719+
// common/ledger/blockledger/file to interact with a file ledger for deliver
720+
type fileLedgerBlockStore struct {
721+
ledger.PeerLedger
722+
}
723+
724+
func (flbs fileLedgerBlockStore) AddBlock(*common.Block) error {
725+
return nil
726+
}
727+
728+
func (flbs fileLedgerBlockStore) RetrieveBlocks(startBlockNumber uint64) (commonledger.ResultsIterator, error) {
729+
return flbs.GetBlocksIterator(startBlockNumber)
730+
}

peer/node/start.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/hyperledger/fabric/peer/version"
4444
cb "github.com/hyperledger/fabric/protos/common"
4545
"github.com/hyperledger/fabric/protos/ledger/rwset"
46+
ab "github.com/hyperledger/fabric/protos/orderer"
4647
pb "github.com/hyperledger/fabric/protos/peer"
4748
"github.com/pkg/errors"
4849
"github.com/spf13/cobra"
@@ -157,6 +158,11 @@ func serve(args []string) error {
157158
grpclog.Fatalf("Failed to create ehub server: %v", err)
158159
}
159160

161+
// create the peer's AtomicBroadcastServer, which supports deliver but not
162+
// broadcast
163+
abServer := peer.NewAtomicBroadcastServer()
164+
ab.RegisterAtomicBroadcastServer(peerServer.Server(), abServer)
165+
160166
// enable the cache of chaincode info
161167
ccprovider.EnableCCInfoCache()
162168

0 commit comments

Comments
 (0)