Skip to content

Commit 200ec09

Browse files
committed
[FAB-12502] Deliver client support for etcdraft
This change set prepares the block puller to be attached to the etcdraft consenter and chain. It adds the needed configuration in the orderer.yaml, and also adds code that creates a block puller. Change-Id: Ieb7c430613b16ba8625b323cdd8e170ed073ee1a Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 365a710 commit 200ec09

File tree

13 files changed

+643
-27
lines changed

13 files changed

+643
-27
lines changed

orderer/common/cluster/util.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bytes"
1111
"encoding/hex"
1212
"encoding/pem"
13+
"reflect"
1314
"sync/atomic"
1415

1516
"github.com/hyperledger/fabric/common/channelconfig"
@@ -106,6 +107,31 @@ func NewTLSPinningDialer(config comm.ClientConfig) *PredicateDialer {
106107
return d
107108
}
108109

110+
// ClientConfig returns the comm.ClientConfig, or an error
111+
// if they cannot be extracted.
112+
func (dialer *PredicateDialer) ClientConfig() (comm.ClientConfig, error) {
113+
val := dialer.Config.Load()
114+
if val == nil {
115+
return comm.ClientConfig{}, errors.New("client config not initialized")
116+
}
117+
if cc, isClientConfig := val.(comm.ClientConfig); !isClientConfig {
118+
err := errors.Errorf("value stored is %v, not comm.ClientConfig",
119+
reflect.TypeOf(val))
120+
return comm.ClientConfig{}, err
121+
} else {
122+
if cc.SecOpts == nil {
123+
return comm.ClientConfig{}, errors.New("SecOpts is nil")
124+
}
125+
// Copy by value the secure options
126+
secOpts := *cc.SecOpts
127+
return comm.ClientConfig{
128+
Timeout: cc.Timeout,
129+
SecOpts: &secOpts,
130+
KaOpts: cc.KaOpts,
131+
}, nil
132+
}
133+
}
134+
109135
// SetConfig sets the configuration of the PredicateDialer
110136
func (dialer *PredicateDialer) SetConfig(config comm.ClientConfig) {
111137
configCopy := comm.ClientConfig{
@@ -147,13 +173,13 @@ func DERtoPEM(der []byte) string {
147173
}))
148174
}
149175

150-
// StandardDialerDialer wraps a PredicateDialer
176+
// StandardDialer wraps a PredicateDialer
151177
// to a standard cluster.Dialer that passes in a nil verify function
152-
type StandardDialerDialer struct {
178+
type StandardDialer struct {
153179
Dialer *PredicateDialer
154180
}
155181

156-
func (bdp *StandardDialerDialer) Dial(address string) (*grpc.ClientConn, error) {
182+
func (bdp *StandardDialer) Dial(address string) (*grpc.ClientConn, error) {
157183
return bdp.Dialer.Dial(address, nil)
158184
}
159185

@@ -264,17 +290,24 @@ func VerifyBlockSignature(block *common.Block, verifier BlockVerifier) error {
264290
return verifier.VerifyBlockSignature(signatureSet)
265291
}
266292

267-
// TLSCACertsFromConfigBlock retrieves TLS CA certificates
293+
// EndpointConfig defines a configuration
294+
// of endpoints of ordering service nodes
295+
type EndpointConfig struct {
296+
TLSRootCAs [][]byte
297+
Endpoints []string
298+
}
299+
300+
// EndpointconfigFromConfigBlock retrieves TLS CA certificates and endpoints
268301
// from a config block.
269-
func TLSCACertsFromConfigBlock(block *common.Block) ([][]byte, error) {
302+
func EndpointconfigFromConfigBlock(block *common.Block) (*EndpointConfig, error) {
270303
if block == nil {
271304
return nil, errors.New("nil block")
272305
}
273306
envelopeConfig, err := utils.ExtractEnvelope(block, 0)
274307
if err != nil {
275308
return nil, err
276309
}
277-
var res [][]byte
310+
var tlsCACerts [][]byte
278311
bundle, err := channelconfig.NewBundleFromEnvelope(envelopeConfig)
279312
if err != nil {
280313
return nil, errors.Wrap(err, "failed extracting bundle from envelope")
@@ -292,7 +325,10 @@ func TLSCACertsFromConfigBlock(block *common.Block) ([][]byte, error) {
292325
if msp == nil {
293326
return nil, errors.Errorf("no MSP found for MSP with ID of %s", org.MSPID())
294327
}
295-
res = append(res, msp.GetTLSRootCerts()...)
328+
tlsCACerts = append(tlsCACerts, msp.GetTLSRootCerts()...)
296329
}
297-
return res, nil
330+
return &EndpointConfig{
331+
Endpoints: bundle.ChannelConfig().OrdererAddresses(),
332+
TLSRootCAs: tlsCACerts,
333+
}, nil
298334
}

orderer/common/cluster/util_test.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestStandardDialerDialer(t *testing.T) {
105105
t.Parallel()
106106
emptyCertificate := []byte("-----BEGIN CERTIFICATE-----\n-----END CERTIFICATE-----")
107107
dialer := cluster.NewTLSPinningDialer(comm.ClientConfig{SecOpts: &comm.SecureOptions{UseTLS: true, ServerRootCAs: [][]byte{emptyCertificate}}})
108-
standardDialer := &cluster.StandardDialerDialer{Dialer: dialer}
108+
standardDialer := &cluster.StandardDialer{Dialer: dialer}
109109
_, err := standardDialer.Dial("127.0.0.1:8080")
110110
assert.EqualError(t, err, "error adding root certificate: asn1: syntax error: sequence truncated")
111111
}
@@ -332,48 +332,49 @@ func createBlockChain(start, end uint64) []*common.Block {
332332
return blockchain
333333
}
334334

335-
func TestTLSCACertsFromConfigBlockGreenPath(t *testing.T) {
335+
func TestEndpointconfigFromConfigBlockGreenPath(t *testing.T) {
336336
blockBytes, err := ioutil.ReadFile("testdata/mychannel.block")
337337
assert.NoError(t, err)
338338

339339
block := &common.Block{}
340340
assert.NoError(t, proto.Unmarshal(blockBytes, block))
341341

342-
certs, err := cluster.TLSCACertsFromConfigBlock(block)
342+
endpointConfig, err := cluster.EndpointconfigFromConfigBlock(block)
343343
assert.NoError(t, err)
344-
assert.Len(t, certs, 1)
344+
assert.Len(t, endpointConfig.TLSRootCAs, 1)
345+
assert.Equal(t, []string{"orderer.example.com:7050"}, endpointConfig.Endpoints)
345346

346-
bl, _ := pem.Decode(certs[0])
347+
bl, _ := pem.Decode(endpointConfig.TLSRootCAs[0])
347348
cert, err := x509.ParseCertificate(bl.Bytes)
348349
assert.NoError(t, err)
349350

350351
assert.True(t, cert.IsCA)
351352
assert.Equal(t, "tlsca.example.com", cert.Subject.CommonName)
352353
}
353354

354-
func TestTLSCACertsFromConfigBlockFailures(t *testing.T) {
355+
func TestEndpointconfigFromConfigBlockFailures(t *testing.T) {
355356
t.Run("nil block", func(t *testing.T) {
356-
certs, err := cluster.TLSCACertsFromConfigBlock(nil)
357+
certs, err := cluster.EndpointconfigFromConfigBlock(nil)
357358
assert.Nil(t, certs)
358359
assert.EqualError(t, err, "nil block")
359360
})
360361

361362
t.Run("nil block data", func(t *testing.T) {
362-
certs, err := cluster.TLSCACertsFromConfigBlock(&common.Block{})
363+
certs, err := cluster.EndpointconfigFromConfigBlock(&common.Block{})
363364
assert.Nil(t, certs)
364365
assert.EqualError(t, err, "block data is nil")
365366
})
366367

367368
t.Run("no envelope", func(t *testing.T) {
368-
certs, err := cluster.TLSCACertsFromConfigBlock(&common.Block{
369+
certs, err := cluster.EndpointconfigFromConfigBlock(&common.Block{
369370
Data: &common.BlockData{},
370371
})
371372
assert.Nil(t, certs)
372373
assert.EqualError(t, err, "envelope index out of bounds")
373374
})
374375

375376
t.Run("bad envelope", func(t *testing.T) {
376-
certs, err := cluster.TLSCACertsFromConfigBlock(&common.Block{
377+
certs, err := cluster.EndpointconfigFromConfigBlock(&common.Block{
377378
Data: &common.BlockData{
378379
Data: [][]byte{{}},
379380
},
@@ -382,3 +383,39 @@ func TestTLSCACertsFromConfigBlockFailures(t *testing.T) {
382383
assert.EqualError(t, err, "failed extracting bundle from envelope: envelope header cannot be nil")
383384
})
384385
}
386+
387+
func TestClientConfig(t *testing.T) {
388+
t.Run("Uninitialized dialer", func(t *testing.T) {
389+
dialer := &cluster.PredicateDialer{}
390+
_, err := dialer.ClientConfig()
391+
assert.EqualError(t, err, "client config not initialized")
392+
})
393+
394+
t.Run("Wrong type stored", func(t *testing.T) {
395+
dialer := &cluster.PredicateDialer{}
396+
dialer.Config.Store("foo")
397+
_, err := dialer.ClientConfig()
398+
assert.EqualError(t, err, "value stored is string, not comm.ClientConfig")
399+
})
400+
401+
t.Run("Nil secure options", func(t *testing.T) {
402+
dialer := &cluster.PredicateDialer{}
403+
dialer.Config.Store(comm.ClientConfig{
404+
SecOpts: nil,
405+
})
406+
_, err := dialer.ClientConfig()
407+
assert.EqualError(t, err, "SecOpts is nil")
408+
})
409+
410+
t.Run("Valid config", func(t *testing.T) {
411+
dialer := &cluster.PredicateDialer{}
412+
dialer.Config.Store(comm.ClientConfig{
413+
SecOpts: &comm.SecureOptions{
414+
Key: []byte{1, 2, 3},
415+
},
416+
})
417+
cc, err := dialer.ClientConfig()
418+
assert.NoError(t, err)
419+
assert.Equal(t, []byte{1, 2, 3}, cc.SecOpts.Key)
420+
})
421+
}

orderer/common/localconfig/config.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@ type General struct {
5757
}
5858

5959
type Cluster struct {
60-
RootCAs []string
61-
ClientCertificate string
62-
ClientPrivateKey string
63-
DialTimeout time.Duration
64-
RPCTimeout time.Duration
60+
RootCAs []string
61+
ClientCertificate string
62+
ClientPrivateKey string
63+
DialTimeout time.Duration
64+
RPCTimeout time.Duration
65+
ReplicationBufferSize int
66+
ReplicationPullTimeout time.Duration
67+
ReplicationRetryTimeout time.Duration
6568
}
6669

6770
// Keepalive contains configuration for gRPC servers.

orderer/common/multichannel/chainsupport.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ package multichannel
99
import (
1010
"github.com/hyperledger/fabric/common/crypto"
1111
"github.com/hyperledger/fabric/common/ledger/blockledger"
12+
"github.com/hyperledger/fabric/common/policies"
1213
"github.com/hyperledger/fabric/orderer/common/blockcutter"
1314
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
1415
"github.com/hyperledger/fabric/orderer/consensus"
@@ -138,3 +139,16 @@ func (cs *ChainSupport) ConfigProto() *cb.Config {
138139
func (cs *ChainSupport) Sequence() uint64 {
139140
return cs.ConfigtxValidator().Sequence()
140141
}
142+
143+
// VerifyBlockSignature verifies a signature of a block
144+
func (cs *ChainSupport) VerifyBlockSignature(sd []*cb.SignedData) error {
145+
policy, exists := cs.PolicyManager().GetPolicy(policies.BlockValidation)
146+
if !exists {
147+
return errors.Errorf("policy %s wasn't found", policies.BlockValidation)
148+
}
149+
err := policy.Evaluate(sd)
150+
if err != nil {
151+
return errors.Wrap(err, "block verification failed")
152+
}
153+
return nil
154+
}

orderer/common/multichannel/chainsupprt_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@ package multichannel
99
import (
1010
"testing"
1111

12+
"github.com/hyperledger/fabric/common/channelconfig"
1213
"github.com/hyperledger/fabric/common/deliver/mock"
1314
"github.com/hyperledger/fabric/common/ledger/blockledger/mocks"
15+
"github.com/hyperledger/fabric/common/mocks/config"
16+
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
17+
"github.com/hyperledger/fabric/common/policies"
1418
"github.com/hyperledger/fabric/protos/common"
1519
"github.com/hyperledger/fabric/protos/orderer"
20+
"github.com/pkg/errors"
1621
"github.com/stretchr/testify/assert"
1722
)
1823

@@ -31,3 +36,48 @@ func TestChainSupportBlock(t *testing.T) {
3136
assert.Nil(t, cs.Block(100))
3237
assert.Equal(t, uint64(99), cs.Block(99).Header.Number)
3338
}
39+
40+
type mutableResourcesMock struct {
41+
config.Resources
42+
}
43+
44+
func (*mutableResourcesMock) Update(*channelconfig.Bundle) {
45+
panic("implement me")
46+
}
47+
48+
func TestVerifyBlockSignature(t *testing.T) {
49+
policyMgr := &mockpolicies.Manager{
50+
PolicyMap: make(map[string]policies.Policy),
51+
}
52+
ms := &mutableResourcesMock{
53+
Resources: config.Resources{
54+
PolicyManagerVal: policyMgr,
55+
},
56+
}
57+
cs := &ChainSupport{
58+
ledgerResources: &ledgerResources{
59+
configResources: &configResources{
60+
mutableResources: ms,
61+
},
62+
},
63+
}
64+
65+
// Scenario I: Policy manager isn't initialized
66+
// and thus policy cannot be found
67+
err := cs.VerifyBlockSignature([]*common.SignedData{})
68+
assert.EqualError(t, err, "policy /Channel/Orderer/BlockValidation wasn't found")
69+
70+
// Scenario II: Policy manager finds policy, but it evaluates
71+
// to error.
72+
policyMgr.PolicyMap["/Channel/Orderer/BlockValidation"] = &mockpolicies.Policy{
73+
Err: errors.New("invalid signature"),
74+
}
75+
err = cs.VerifyBlockSignature([]*common.SignedData{})
76+
assert.EqualError(t, err, "block verification failed: invalid signature")
77+
78+
// Scenario III: Policy manager finds policy, and it evaluates to success
79+
policyMgr.PolicyMap["/Channel/Orderer/BlockValidation"] = &mockpolicies.Policy{
80+
Err: nil,
81+
}
82+
assert.NoError(t, cs.VerifyBlockSignature([]*common.SignedData{}))
83+
}

orderer/consensus/consensus.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ type ConsenterSupport interface {
7474
crypto.LocalSigner
7575
msgprocessor.Processor
7676

77+
// VerifyBlockSignature verifies a signature of a block
78+
VerifyBlockSignature([]*cb.SignedData) error
79+
7780
// BlockCutter returns the block cutting helper for this channel.
7881
BlockCutter() blockcutter.Receiver
7982

17.4 KB
Binary file not shown.

0 commit comments

Comments
 (0)