Skip to content

Commit a2ea1ef

Browse files
yacovmmastersingh24
authored andcommitted
[FAB-11047] Enlarge discovery client memoization cache
The discovery server side has a credential cache that reuses results of access control checks for tuples of requests and signatures to save CPU cycles. The idea is that when clients send a request that was sent in the past, they can skip signinig it and send the signature that was produced in the past in order for the service on the peer to find the signature and request digest in its cache and not do costly signature checks. The discovery client implementation which is used by the go-SDK, caches only the latest request sent to the service which isn't ideal. This change set changes this, and introduces a small cache to the client implementation. Change-Id: Iade112d2a0cd1fac7dc7968105c652ad7f53d88c Signed-off-by: yacovm <yacovm@il.ibm.com>
1 parent 6be352a commit a2ea1ef

File tree

6 files changed

+199
-27
lines changed

6 files changed

+199
-27
lines changed

discovery/client/client.go

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
77
package discovery
88

99
import (
10-
"bytes"
1110
"context"
1211
"encoding/json"
1312
"fmt"
@@ -27,8 +26,6 @@ var (
2726

2827
// Client interacts with the discovery server
2928
type Client struct {
30-
lastRequest []byte
31-
lastSignature []byte
3229
createConnection Dialer
3330
signRequest Signer
3431
}
@@ -161,23 +158,11 @@ func (c *Client) Send(ctx context.Context, req *Request, auth *discovery.AuthInf
161158
if err != nil {
162159
return nil, errors.Wrap(err, "failed marshaling Request to bytes")
163160
}
164-
sig := c.lastSignature
165-
// Only sign the Request if it is different than the previous Request sent.
166-
// Otherwise, use the last signature from the previous send.
167-
// This is not only to save CPU cycles in the Client-side,
168-
// but also for the server side to be able to memoize the signature verification.
169-
// We have the use the previous signature, because many signature schemes are not deterministic.
170-
if !bytes.Equal(c.lastRequest, payload) {
171-
sig, err = c.signRequest(payload)
172-
if err != nil {
173-
return nil, errors.Wrap(err, "failed signing Request")
174-
}
175-
}
176161

177-
// Remember this Request and the corresponding signature, in order to skip signing next time
178-
// and reuse the signature
179-
c.lastRequest = payload
180-
c.lastSignature = sig
162+
sig, err := c.signRequest(payload)
163+
if err != nil {
164+
return nil, errors.Wrap(err, "failed signing Request")
165+
}
181166

182167
conn, err := c.createConnection()
183168
if err != nil {
@@ -550,10 +535,10 @@ type endorsementDescriptor struct {
550535
}
551536

552537
// NewClient creates a new Client instance
553-
func NewClient(createConnection Dialer, s Signer) *Client {
538+
func NewClient(createConnection Dialer, s Signer, signerCacheSize int) *Client {
554539
return &Client{
555540
createConnection: createConnection,
556-
signRequest: s,
541+
signRequest: NewMemoizeSigner(s, signerCacheSize).Sign,
557542
}
558543
}
559544

discovery/client/client_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ import (
4141
"google.golang.org/grpc/credentials"
4242
)
4343

44+
const (
45+
signerCacheSize = 1
46+
)
47+
4448
var (
4549
ctx = context.Background()
4650

@@ -328,7 +332,7 @@ func TestClient(t *testing.T) {
328332
ClientIdentity: []byte{1, 2, 3},
329333
ClientTlsCertHash: util.ComputeSHA256(clientTLSCert.Certificate[0]),
330334
}
331-
cl := NewClient(connect, signer)
335+
cl := NewClient(connect, signer, signerCacheSize)
332336

333337
sup.On("PeersOfChannel").Return(channelPeersWithoutChaincodes).Times(2)
334338
req := NewRequest()
@@ -450,7 +454,7 @@ func TestUnableToSign(t *testing.T) {
450454
authInfo := &discovery.AuthInfo{
451455
ClientIdentity: []byte{1, 2, 3},
452456
}
453-
cl := NewClient(failToConnect, signer)
457+
cl := NewClient(failToConnect, signer, signerCacheSize)
454458
req := NewRequest()
455459
req = req.OfChannel("mychannel")
456460
resp, err := cl.Send(ctx, req, authInfo)
@@ -468,7 +472,7 @@ func TestUnableToConnect(t *testing.T) {
468472
auth := &discovery.AuthInfo{
469473
ClientIdentity: []byte{1, 2, 3},
470474
}
471-
cl := NewClient(failToConnect, signer)
475+
cl := NewClient(failToConnect, signer, signerCacheSize)
472476
req := NewRequest()
473477
req = req.OfChannel("mychannel")
474478
resp, err := cl.Send(ctx, req, auth)
@@ -491,7 +495,7 @@ func TestBadResponses(t *testing.T) {
491495
auth := &discovery.AuthInfo{
492496
ClientIdentity: []byte{1, 2, 3},
493497
}
494-
cl := NewClient(connect, signer)
498+
cl := NewClient(connect, signer, signerCacheSize)
495499

496500
// Scenario I: discovery service sends back an error
497501
svc.On("Discover").Return(nil, errors.New("foo")).Once()

discovery/client/signer.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright IBM Corp All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package discovery
8+
9+
import (
10+
"encoding/hex"
11+
"sync"
12+
13+
"github.com/hyperledger/fabric/common/util"
14+
)
15+
16+
// MemoizeSigner signs messages with the same signature
17+
// if the message was signed recently
18+
type MemoizeSigner struct {
19+
maxMemorySize int
20+
sync.RWMutex
21+
memory map[string][]byte
22+
sign Signer
23+
}
24+
25+
// NewMemoizeSigner creates a new MemoizeSigner that signs
26+
// message with the given sign function
27+
func NewMemoizeSigner(signFunc Signer, maxMemorySize int) *MemoizeSigner {
28+
return &MemoizeSigner{
29+
maxMemorySize: maxMemorySize,
30+
memory: make(map[string][]byte),
31+
sign: signFunc,
32+
}
33+
}
34+
35+
// Signer signs a message and returns the signature and nil,
36+
// or nil and error on failure
37+
func (ms *MemoizeSigner) Sign(msg []byte) ([]byte, error) {
38+
sig, isInMemory := ms.lookup(msg)
39+
if isInMemory {
40+
return sig, nil
41+
}
42+
sig, err := ms.sign(msg)
43+
if err != nil {
44+
return nil, err
45+
}
46+
ms.memorize(msg, sig)
47+
return sig, nil
48+
}
49+
50+
// lookup looks up the given message in memory and returns
51+
// the signature, if the message is in memory
52+
func (ms *MemoizeSigner) lookup(msg []byte) ([]byte, bool) {
53+
ms.RLock()
54+
defer ms.RUnlock()
55+
sig, exists := ms.memory[msgDigest(msg)]
56+
return sig, exists
57+
}
58+
59+
func (ms *MemoizeSigner) memorize(msg, signature []byte) {
60+
ms.RLock()
61+
shouldShrink := len(ms.memory) >= ms.maxMemorySize
62+
ms.RUnlock()
63+
64+
if shouldShrink {
65+
ms.shrinkMemory()
66+
}
67+
68+
ms.Lock()
69+
defer ms.Unlock()
70+
ms.memory[msgDigest(msg)] = signature
71+
}
72+
73+
// evict evicts random messages from memory
74+
// until its size is smaller than maxMemorySize
75+
func (ms *MemoizeSigner) shrinkMemory() {
76+
ms.Lock()
77+
defer ms.Unlock()
78+
for len(ms.memory) > ms.maxMemorySize {
79+
ms.evictFromMemory()
80+
}
81+
}
82+
83+
// evictFromMemory evicts a random message from memory
84+
func (ms *MemoizeSigner) evictFromMemory() {
85+
for dig := range ms.memory {
86+
delete(ms.memory, dig)
87+
return
88+
}
89+
}
90+
91+
// msgDigest returns a digest of a given message
92+
func msgDigest(msg []byte) string {
93+
return hex.EncodeToString(util.ComputeSHA256(msg))
94+
}

discovery/client/signer_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright IBM Corp All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package discovery
8+
9+
import (
10+
"sync"
11+
"sync/atomic"
12+
"testing"
13+
14+
"github.com/hyperledger/fabric/bccsp/factory"
15+
"github.com/pkg/errors"
16+
"github.com/stretchr/testify/assert"
17+
)
18+
19+
func init() {
20+
factory.InitFactories(nil)
21+
}
22+
23+
func TestSameMessage(t *testing.T) {
24+
var signedInvokedCount int
25+
sign := func(msg []byte) ([]byte, error) {
26+
signedInvokedCount++
27+
return msg, nil
28+
}
29+
30+
ms := NewMemoizeSigner(sign, 10)
31+
for i := 0; i < 5; i++ {
32+
sig, err := ms.Sign([]byte{1, 2, 3})
33+
assert.NoError(t, err)
34+
assert.Equal(t, []byte{1, 2, 3}, sig)
35+
assert.Equal(t, 1, signedInvokedCount)
36+
}
37+
}
38+
39+
func TestDifferentMessages(t *testing.T) {
40+
n := 5
41+
var signedInvokedCount uint32
42+
sign := func(msg []byte) ([]byte, error) {
43+
atomic.AddUint32(&signedInvokedCount, 1)
44+
return msg, nil
45+
}
46+
47+
ms := NewMemoizeSigner(sign, n)
48+
parallelSignRange := func(start, end int) {
49+
var wg sync.WaitGroup
50+
wg.Add(end - start)
51+
for i := start; i < end; i++ {
52+
i := i
53+
go func() {
54+
defer wg.Done()
55+
sig, err := ms.Sign([]byte{byte(i)})
56+
assert.NoError(t, err)
57+
assert.Equal(t, []byte{byte(i)}, sig)
58+
}()
59+
}
60+
wg.Wait()
61+
}
62+
63+
// Query once
64+
parallelSignRange(0, n)
65+
assert.Equal(t, uint32(n), atomic.LoadUint32(&signedInvokedCount))
66+
67+
// Query twice
68+
parallelSignRange(0, n)
69+
assert.Equal(t, uint32(n), atomic.LoadUint32(&signedInvokedCount))
70+
71+
// Query thrice on a disjoint range
72+
parallelSignRange(n+1, 2*n)
73+
oldSignedInvokedCount := atomic.LoadUint32(&signedInvokedCount)
74+
75+
// Ensure that some of the early messages 0-n were purged from memory
76+
parallelSignRange(0, n)
77+
assert.True(t, oldSignedInvokedCount < atomic.LoadUint32(&signedInvokedCount))
78+
}
79+
80+
func TestFailure(t *testing.T) {
81+
sign := func(_ []byte) ([]byte, error) {
82+
return nil, errors.New("something went wrong")
83+
}
84+
85+
ms := NewMemoizeSigner(sign, 1)
86+
_, err := ms.Sign([]byte{1, 2, 3})
87+
assert.Equal(t, "something went wrong", err.Error())
88+
}

discovery/cmd/stub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (stub *ClientStub) Send(server string, conf common.Config, req *discovery.R
6161
timeout, cancel := context.WithTimeout(context.Background(), defaultTimeout)
6262
defer cancel()
6363

64-
disc := discovery.NewClient(comm.NewDialer(server), signer.Sign)
64+
disc := discovery.NewClient(comm.NewDialer(server), signer.Sign, 10)
6565

6666
resp, err := disc.Send(timeout, req, &AuthInfo{
6767
ClientIdentity: signer.Creator,

discovery/test/integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,8 @@ func createClientAndService(t *testing.T, testdir string) (*client, *service) {
494494
assert.NoError(t, err)
495495

496496
wrapperClient := &client{AuthInfo: authInfo, conn: conn}
497-
c := disc.NewClient(wrapperClient.newConnection, signer.Sign)
497+
signerCacheSize := 10
498+
c := disc.NewClient(wrapperClient.newConnection, signer.Sign, signerCacheSize)
498499
wrapperClient.Client = c
499500
service := &service{Server: gRPCServer.Server(), lc: lc, sup: sup}
500501
return wrapperClient, service

0 commit comments

Comments
 (0)