Skip to content

Commit b4a1ec8

Browse files
committed
[FAB-7521] Lookup correct policy name
While reusing deliverer API as a blocks event source at peer we need to lookup for correct policy name based on the RSCC resource API definition name. This commit adds a factory method which introduces level of indirection to allow lookup of correct policy name based on the deliver API initialization handler. Change-Id: Ib896736793722549f035cca9e0b6a4c871050615 Signed-off-by: Artem Barger <bartem@il.ibm.com>
1 parent f6bb64b commit b4a1ec8

File tree

7 files changed

+65
-23
lines changed

7 files changed

+65
-23
lines changed

bddtests/features/bootstrap.feature

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,14 @@ Feature: Bootstrap
254254
| ChainId | Start | End |
255255
| com.acme.blockchain.jdoe.channel1 | 0 | 0 |
256256

257-
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "1" blocks with "1" messages within "1" seconds
257+
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannel" from "peer0" of "0" blocks with "0" messages within "1" seconds
258258

259259
When user "dev0Org0" using cert alias "consortium1-cert" connects to deliver function on orderer "peer2" using port "7051"
260260
And user "dev0Org0" sends deliver a seek request on orderer "peer2" with properties:
261261
| ChainId | Start | End |
262262
| com.acme.blockchain.jdoe.channel1 | 0 | 0 |
263263

264-
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "1" blocks with "1" messages within "1" seconds
264+
Then user "dev0Org0" should get a delivery "genesisBlockForMyNewChannelFromOtherOrgsPeer" from "peer2" of "0" blocks with "0" messages within "1" seconds
265265

266266
# Entry point for invoking on an existing channel
267267
When user "peer0Admin" creates a chaincode spec "ccSpec" with name "example02" of type "GOLANG" for chaincode "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example02" with args

common/config/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ SPDX-License-Identifier: Apache-2.0
66
package config
77

88
import (
9+
"github.com/hyperledger/fabric/common/resourcesconfig"
910
cb "github.com/hyperledger/fabric/protos/common"
1011
)
1112

@@ -25,4 +26,7 @@ type Manager interface {
2526

2627
// GetResourceConfig defines methods that are related to resource configuration
2728
GetResourceConfig(channel string) Config
29+
30+
// GetPolicyMapper returns API to the policy mapper
31+
GetPolicyMapper(channel string) resourcesconfig.PolicyMapper
2832
}

common/deliver/deliver.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,18 @@ type Support interface {
6868
Errored() <-chan struct{}
6969
}
7070

71+
// PolicyNameProvider provides a policy name given the channel id
72+
type PolicyNameProvider func(chainID string) (string, error)
73+
7174
type deliverServer struct {
7275
sm SupportManager
73-
policyName string
76+
policyProvider PolicyNameProvider
7477
timeWindow time.Duration
7578
bindingInspector comm.BindingInspector
7679
}
7780

7881
// NewHandlerImpl creates an implementation of the Handler interface
79-
func NewHandlerImpl(sm SupportManager, policyName string, timeWindow time.Duration, mutualTLS bool) Handler {
82+
func NewHandlerImpl(sm SupportManager, policyProvider PolicyNameProvider, timeWindow time.Duration, mutualTLS bool) Handler {
8083
// function to extract the TLS cert hash from a channel header
8184
extract := func(msg proto.Message) []byte {
8285
chdr, isChannelHeader := msg.(*cb.ChannelHeader)
@@ -89,7 +92,7 @@ func NewHandlerImpl(sm SupportManager, policyName string, timeWindow time.Durati
8992

9093
return &deliverServer{
9194
sm: sm,
92-
policyName: policyName,
95+
policyProvider: policyProvider,
9396
timeWindow: timeWindow,
9497
bindingInspector: bindingInspector,
9598
}
@@ -163,7 +166,12 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
163166

164167
lastConfigSequence := chain.Sequence()
165168

166-
sf := NewSigFilter(ds.policyName, chain)
169+
policyName, err := ds.policyProvider(chdr.ChannelId)
170+
if err != nil {
171+
logger.Warningf("[channel: %s] failed to obtain policy name due to %s", chdr.ChannelId, err)
172+
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
173+
}
174+
sf := NewSigFilter(policyName, chain)
167175
if err := sf.Apply(envelope); err != nil {
168176
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
169177
return sendStatusReply(srv, cb.Status_FORBIDDEN)

common/deliver/deliver_test.go

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import (
2525
"time"
2626

2727
"github.com/golang/protobuf/ptypes/timestamp"
28+
"github.com/hyperledger/fabric/bccsp/factory"
2829
"github.com/hyperledger/fabric/common/flogging"
2930
"github.com/hyperledger/fabric/common/ledger/blockledger"
30-
ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram"
31+
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
3132
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
3233
"github.com/hyperledger/fabric/common/policies"
3334
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
@@ -44,7 +45,10 @@ import (
4445

4546
var genesisBlock = cb.NewBlock(0, nil)
4647
var systemChainID = "systemChain"
47-
var policyName = policies.ChannelReaders
48+
var policyNameProvider = func(_ string) (string, error) {
49+
return policies.ChannelReaders, nil
50+
}
51+
4852
var timeWindow = time.Duration(15 * time.Minute)
4953
var testCert = &x509.Certificate{
5054
Raw: []byte("test"),
@@ -55,6 +59,7 @@ const mutualTLS = true
5559

5660
func init() {
5761
flogging.SetModuleLevel(pkgLogID, "DEBUG")
62+
factory.InitFactories(nil)
5863
}
5964

6065
type mockStream struct {
@@ -174,7 +179,7 @@ func initializeDeliverHandler() Handler {
174179
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
175180
}
176181

177-
return NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
182+
return NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
178183
}
179184

180185
func newMockMultichainManager() *mockSupportManager {
@@ -323,7 +328,7 @@ func TestUnauthorizedSeek(t *testing.T) {
323328

324329
m := newMockD()
325330
defer close(m.recvChan)
326-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
331+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
327332

328333
go ds.Handle(m)
329334

@@ -348,7 +353,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {
348353

349354
m := newMockD()
350355
defer close(m.recvChan)
351-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
356+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
352357

353358
go ds.Handle(m)
354359

@@ -431,7 +436,7 @@ func TestBlockingSeek(t *testing.T) {
431436

432437
m := newMockD()
433438
defer close(m.recvChan)
434-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
439+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
435440

436441
go ds.Handle(m)
437442

@@ -485,7 +490,7 @@ func TestErroredSeek(t *testing.T) {
485490

486491
m := newMockD()
487492
defer close(m.recvChan)
488-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
493+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
489494

490495
go ds.Handle(m)
491496

@@ -509,7 +514,7 @@ func TestErroredBlockingSeek(t *testing.T) {
509514

510515
m := newMockD()
511516
defer close(m.recvChan)
512-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
517+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
513518

514519
go ds.Handle(m)
515520

@@ -534,7 +539,7 @@ func TestErroredBlockingSeek(t *testing.T) {
534539

535540
func TestSGracefulShutdown(t *testing.T) {
536541
m := newMockD()
537-
ds := NewHandlerImpl(nil, policyName, timeWindow, !mutualTLS)
542+
ds := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS)
538543

539544
close(m.recvChan)
540545
assert.NoError(t, ds.Handle(m), "Expected no error for hangup")
@@ -562,7 +567,7 @@ func TestReversedSeqSeek(t *testing.T) {
562567
}
563568

564569
func TestBadStreamRecv(t *testing.T) {
565-
bh := NewHandlerImpl(nil, policyName, timeWindow, !mutualTLS)
570+
bh := NewHandlerImpl(nil, policyNameProvider, timeWindow, !mutualTLS)
566571
assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error")
567572
}
568573

@@ -651,7 +656,7 @@ func TestChainNotFound(t *testing.T) {
651656
m := newMockD()
652657
defer close(m.recvChan)
653658

654-
ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS)
659+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, !mutualTLS)
655660
go ds.Handle(m)
656661

657662
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
@@ -786,7 +791,7 @@ func TestSeekWithMutualTLS(t *testing.T) {
786791
m := newMockD()
787792
defer close(m.recvChan)
788793

789-
ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS)
794+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS)
790795
go ds.Handle(m)
791796

792797
m.recvChan <- makeSeekWithTLSCertHash(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, testCert)
@@ -815,7 +820,7 @@ func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) {
815820
m := newMockD()
816821
defer close(m.recvChan)
817822

818-
ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS)
823+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS)
819824
go ds.Handle(m)
820825
wrongCert := &x509.Certificate{
821826
Raw: []byte("wrong"),
@@ -841,7 +846,7 @@ func TestSeekWithMutualTLS_noTLSCert(t *testing.T) {
841846
m := newMockD()
842847
defer close(m.recvChan)
843848

844-
ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS)
849+
ds := NewHandlerImpl(mm, policyNameProvider, timeWindow, mutualTLS)
845850
go ds.Handle(m)
846851

847852
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})

core/peer/atomicbroadcast.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@ limitations under the License.
1616
package peer
1717

1818
import (
19+
"fmt"
1920
"runtime/debug"
2021
"time"
2122

2223
"github.com/hyperledger/fabric/common/deliver"
2324
"github.com/hyperledger/fabric/common/flogging"
24-
"github.com/hyperledger/fabric/common/policies"
25+
"github.com/hyperledger/fabric/core/aclmgmt/resources"
2526
"github.com/hyperledger/fabric/protos/common"
2627
ab "github.com/hyperledger/fabric/protos/orderer"
2728
"github.com/op/go-logging"
29+
"github.com/pkg/errors"
2830
)
2931

3032
const pkgLogID = "common/peer"
@@ -61,8 +63,15 @@ func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
6163
// NewAtomicBroadcastServer creates an ab.AtomicBroadcastServer based on the
6264
// ledger Reader. Broadcast is not implemented/supported on the peer.
6365
func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer {
66+
configSupport := NewConfigSupport()
6467
s := &server{
65-
dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policies.ChannelReaders, timeWindow, mutualTLS),
68+
dh: deliver.NewHandlerImpl(DeliverSupportManager{}, func(chainID string) (string, error) {
69+
policyMapper := configSupport.GetPolicyMapper(chainID)
70+
if policyMapper == nil {
71+
return "", errors.New(fmt.Sprintf("cannot find policy mapper for channel %s", chainID))
72+
}
73+
return policyMapper.PolicyRefForAPI(resources.BLOCKEVENT), nil
74+
}, timeWindow, mutualTLS),
6675
}
6776
return s
6877
}

core/peer/peer.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,3 +809,17 @@ func (*configSupport) GetResourceConfig(channel string) cc.Config {
809809
}
810810
return chain.cs.bundleSource.ConfigtxValidator()
811811
}
812+
813+
// GetPolicyMapper returns an instance of a object that represents
814+
// an API policy mapper which provides a mapping from specific API
815+
// function to its policy
816+
func (*configSupport) GetPolicyMapper(channel string) resourcesconfig.PolicyMapper {
817+
chains.RLock()
818+
defer chains.RUnlock()
819+
chain := chains.list[channel]
820+
if chain == nil {
821+
peerLogger.Error("GetPolicyMapper: channel", channel, "not found in the list of channels associated with this peer")
822+
return nil
823+
}
824+
return chain.cs.bundleSource.APIPolicyMapper()
825+
}

orderer/common/server/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ type server struct {
4949
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
5050
func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer {
5151
s := &server{
52-
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, policies.ChannelReaders, timeWindow, mutualTLS),
52+
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, func(_ string) (string, error) {
53+
return policies.ChannelReaders, nil
54+
}, timeWindow, mutualTLS),
5355
bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
5456
debug: debug,
5557
}

0 commit comments

Comments
 (0)