Skip to content

Commit 907bdaa

Browse files
alts: Read max number of concurrent ALTS handshakes from environment variable. (#6267)
* Read max number of concurrent ALTS handshakes from environment variable. * Refactor to use new envconfig file. * Remove impossible if condition in acquire(). * Use weighted semaphore. * Add e2e test for concurrent ALTS handshakes. * Separate into client and server semaphores. * Use TryAcquire instead of Acquire. * Attempt to fix go.sum error. * Run go mod tidy compat=1.17. * Update go.mod for examples subdirectory. * Run go mod tidy -compat=1.17 on examples subdirectory. * Update go.mod in subdirectories. * Update go.mod in security/advancedtls/examples. * Missed another go.mod update. * Do not upgrade glog because it requires Golang 1.19. * Fix glog version in examples/go.sum. * More glog cleanup. * Fix glog issue in gcp/observability/go.sum. * Move ALTS env var into envconfig.go. * Fix go.mod files. * Revert go.sum files. * Revert interop/observability/go.mod change. * Run go mod tidy -compat=1.17 on examples/. * Run gofmt. * Add comment describing test init function.
1 parent 2ac1aae commit 907bdaa

File tree

8 files changed

+95
-73
lines changed

8 files changed

+95
-73
lines changed

credentials/alts/alts_test.go

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/golang/protobuf/proto"
3232
"google.golang.org/grpc"
3333
"google.golang.org/grpc/codes"
34+
"google.golang.org/grpc/credentials/alts/internal/handshaker"
3435
"google.golang.org/grpc/credentials/alts/internal/handshaker/service"
3536
altsgrpc "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
3637
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
@@ -51,6 +52,14 @@ type s struct {
5152
grpctest.Tester
5253
}
5354

55+
func init() {
56+
// The vmOnGCP global variable MUST be forced to true. Otherwise, if
57+
// this test is run anywhere except on a GCP VM, then an ALTS handshake
58+
// will immediately fail.
59+
once.Do(func() {})
60+
vmOnGCP = true
61+
}
62+
5463
func Test(t *testing.T) {
5564
grpctest.RunSubTests(t, s{})
5665
}
@@ -308,14 +317,6 @@ func (s) TestCheckRPCVersions(t *testing.T) {
308317
// server, where both client and server offload to a local, fake handshaker
309318
// service.
310319
func (s) TestFullHandshake(t *testing.T) {
311-
// The vmOnGCP global variable MUST be reset to true after the client
312-
// or server credentials have been created, but before the ALTS
313-
// handshake begins. If vmOnGCP is not reset and this test is run
314-
// anywhere except for a GCP VM, then the ALTS handshake will
315-
// immediately fail.
316-
once.Do(func() {})
317-
vmOnGCP = true
318-
319320
// Start the fake handshaker service and the server.
320321
var wait sync.WaitGroup
321322
defer wait.Wait()
@@ -325,26 +326,41 @@ func (s) TestFullHandshake(t *testing.T) {
325326
defer stopServer()
326327

327328
// Ping the server, authenticating with ALTS.
328-
clientCreds := NewClientCreds(&ClientOptions{HandshakerServiceAddress: handshakerAddress})
329-
conn, err := grpc.Dial(serverAddress, grpc.WithTransportCredentials(clientCreds))
330-
if err != nil {
331-
t.Fatalf("grpc.Dial(%v) failed: %v", serverAddress, err)
329+
establishAltsConnection(t, handshakerAddress, serverAddress)
330+
331+
// Close open connections to the fake handshaker service.
332+
if err := service.CloseForTesting(); err != nil {
333+
t.Errorf("service.CloseForTesting() failed: %v", err)
332334
}
333-
defer conn.Close()
334-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestLongTimeout)
335-
defer cancel()
336-
c := testgrpc.NewTestServiceClient(conn)
337-
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
338-
_, err = c.UnaryCall(ctx, &testpb.SimpleRequest{})
339-
if err == nil {
340-
break
341-
}
342-
if code := status.Code(err); code == codes.Unavailable {
343-
// The server is not ready yet. Try again.
344-
continue
345-
}
346-
t.Fatalf("c.UnaryCall() failed: %v", err)
335+
}
336+
337+
// TestConcurrentHandshakes performs a several, concurrent ALTS handshakes
338+
// between a test client and server, where both client and server offload to a
339+
// local, fake handshaker service.
340+
func (s) TestConcurrentHandshakes(t *testing.T) {
341+
// Set the max number of concurrent handshakes to 3, so that we can
342+
// test the handshaker behavior when handshakes are queued by
343+
// performing more than 3 concurrent handshakes (specifically, 10).
344+
handshaker.ResetConcurrentHandshakeSemaphoreForTesting(3)
345+
346+
// Start the fake handshaker service and the server.
347+
var wait sync.WaitGroup
348+
defer wait.Wait()
349+
stopHandshaker, handshakerAddress := startFakeHandshakerService(t, &wait)
350+
defer stopHandshaker()
351+
stopServer, serverAddress := startServer(t, handshakerAddress, &wait)
352+
defer stopServer()
353+
354+
// Ping the server, authenticating with ALTS.
355+
var waitForConnections sync.WaitGroup
356+
for i := 0; i < 10; i++ {
357+
waitForConnections.Add(1)
358+
go func() {
359+
establishAltsConnection(t, handshakerAddress, serverAddress)
360+
waitForConnections.Done()
361+
}()
347362
}
363+
waitForConnections.Wait()
348364

349365
// Close open connections to the fake handshaker service.
350366
if err := service.CloseForTesting(); err != nil {
@@ -366,6 +382,29 @@ func versions(minMajor, minMinor, maxMajor, maxMinor uint32) *altspb.RpcProtocol
366382
}
367383
}
368384

385+
func establishAltsConnection(t *testing.T, handshakerAddress, serverAddress string) {
386+
clientCreds := NewClientCreds(&ClientOptions{HandshakerServiceAddress: handshakerAddress})
387+
conn, err := grpc.Dial(serverAddress, grpc.WithTransportCredentials(clientCreds))
388+
if err != nil {
389+
t.Fatalf("grpc.Dial(%v) failed: %v", serverAddress, err)
390+
}
391+
defer conn.Close()
392+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestLongTimeout)
393+
defer cancel()
394+
c := testgrpc.NewTestServiceClient(conn)
395+
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
396+
_, err = c.UnaryCall(ctx, &testpb.SimpleRequest{})
397+
if err == nil {
398+
break
399+
}
400+
if code := status.Code(err); code == codes.Unavailable {
401+
// The server is not ready yet. Try again.
402+
continue
403+
}
404+
t.Fatalf("c.UnaryCall() failed: %v", err)
405+
}
406+
}
407+
369408
func startFakeHandshakerService(t *testing.T, wait *sync.WaitGroup) (stop func(), address string) {
370409
listener, err := testutils.LocalTCPListener()
371410
if err != nil {

credentials/alts/internal/handshaker/handshaker.go

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525
"fmt"
2626
"io"
2727
"net"
28-
"sync"
2928

29+
"golang.org/x/sync/semaphore"
3030
grpc "google.golang.org/grpc"
3131
"google.golang.org/grpc/codes"
3232
"google.golang.org/grpc/credentials"
@@ -35,15 +35,13 @@ import (
3535
"google.golang.org/grpc/credentials/alts/internal/conn"
3636
altsgrpc "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
3737
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
38+
"google.golang.org/grpc/internal/envconfig"
3839
)
3940

4041
const (
4142
// The maximum byte size of receive frames.
4243
frameLimit = 64 * 1024 // 64 KB
4344
rekeyRecordProtocolName = "ALTSRP_GCM_AES128_REKEY"
44-
// maxPendingHandshakes represents the maximum number of concurrent
45-
// handshakes.
46-
maxPendingHandshakes = 100
4745
)
4846

4947
var (
@@ -59,9 +57,9 @@ var (
5957
return conn.NewAES128GCMRekey(s, keyData)
6058
},
6159
}
62-
// control number of concurrent created (but not closed) handshakers.
63-
mu sync.Mutex
64-
concurrentHandshakes = int64(0)
60+
// control number of concurrent created (but not closed) handshakes.
61+
clientHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
62+
serverHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes))
6563
// errDropped occurs when maxPendingHandshakes is reached.
6664
errDropped = errors.New("maximum number of concurrent ALTS handshakes is reached")
6765
// errOutOfBound occurs when the handshake service returns a consumed
@@ -77,30 +75,6 @@ func init() {
7775
}
7876
}
7977

80-
func acquire() bool {
81-
mu.Lock()
82-
// If we need n to be configurable, we can pass it as an argument.
83-
n := int64(1)
84-
success := maxPendingHandshakes-concurrentHandshakes >= n
85-
if success {
86-
concurrentHandshakes += n
87-
}
88-
mu.Unlock()
89-
return success
90-
}
91-
92-
func release() {
93-
mu.Lock()
94-
// If we need n to be configurable, we can pass it as an argument.
95-
n := int64(1)
96-
concurrentHandshakes -= n
97-
if concurrentHandshakes < 0 {
98-
mu.Unlock()
99-
panic("bad release")
100-
}
101-
mu.Unlock()
102-
}
103-
10478
// ClientHandshakerOptions contains the client handshaker options that can
10579
// provided by the caller.
10680
type ClientHandshakerOptions struct {
@@ -134,10 +108,6 @@ func DefaultServerHandshakerOptions() *ServerHandshakerOptions {
134108
return &ServerHandshakerOptions{}
135109
}
136110

137-
// TODO: add support for future local and remote endpoint in both client options
138-
// and server options (server options struct does not exist now. When
139-
// caller can provide endpoints, it should be created.
140-
141111
// altsHandshaker is used to complete an ALTS handshake between client and
142112
// server. This handshaker talks to the ALTS handshaker service in the metadata
143113
// server.
@@ -185,10 +155,10 @@ func NewServerHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn,
185155
// ClientHandshake starts and completes a client ALTS handshake for GCP. Once
186156
// done, ClientHandshake returns a secure connection.
187157
func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
188-
if !acquire() {
158+
if !clientHandshakes.TryAcquire(1) {
189159
return nil, nil, errDropped
190160
}
191-
defer release()
161+
defer clientHandshakes.Release(1)
192162

193163
if h.side != core.ClientSide {
194164
return nil, nil, errors.New("only handshakers created using NewClientHandshaker can perform a client handshaker")
@@ -238,10 +208,10 @@ func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credent
238208
// ServerHandshake starts and completes a server ALTS handshake for GCP. Once
239209
// done, ServerHandshake returns a secure connection.
240210
func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) {
241-
if !acquire() {
211+
if !serverHandshakes.TryAcquire(1) {
242212
return nil, nil, errDropped
243213
}
244-
defer release()
214+
defer serverHandshakes.Release(1)
245215

246216
if h.side != core.ServerSide {
247217
return nil, nil, errors.New("only handshakers created using NewServerHandshaker can perform a server handshaker")
@@ -264,8 +234,6 @@ func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credent
264234
}
265235

266236
// Prepare server parameters.
267-
// TODO: currently only ALTS parameters are provided. Might need to use
268-
// more options in the future.
269237
params := make(map[int32]*altspb.ServerHandshakeParameters)
270238
params[int32(altspb.HandshakeProtocol_ALTS)] = &altspb.ServerHandshakeParameters{
271239
RecordProtocols: recordProtocols,
@@ -391,3 +359,10 @@ func (h *altsHandshaker) Close() {
391359
h.stream.CloseSend()
392360
}
393361
}
362+
363+
// ResetConcurrentHandshakeSemaphoreForTesting resets the handshake semaphores
364+
// to allow numberOfAllowedHandshakes concurrent handshakes each.
365+
func ResetConcurrentHandshakeSemaphoreForTesting(numberOfAllowedHandshakes int64) {
366+
clientHandshakes = semaphore.NewWeighted(numberOfAllowedHandshakes)
367+
serverHandshakes = semaphore.NewWeighted(numberOfAllowedHandshakes)
368+
}

credentials/alts/internal/handshaker/handshaker_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
core "google.golang.org/grpc/credentials/alts/internal"
3232
altspb "google.golang.org/grpc/credentials/alts/internal/proto/grpc_gcp"
3333
"google.golang.org/grpc/credentials/alts/internal/testutil"
34+
"google.golang.org/grpc/internal/envconfig"
3435
"google.golang.org/grpc/internal/grpctest"
3536
)
3637

@@ -134,7 +135,7 @@ func (s) TestClientHandshake(t *testing.T) {
134135
numberOfHandshakes int
135136
}{
136137
{0 * time.Millisecond, 1},
137-
{100 * time.Millisecond, 10 * maxPendingHandshakes},
138+
{100 * time.Millisecond, 10 * int(envconfig.ALTSMaxConcurrentHandshakes)},
138139
} {
139140
errc := make(chan error)
140141
stat.Reset()
@@ -182,8 +183,8 @@ func (s) TestClientHandshake(t *testing.T) {
182183
}
183184

184185
// Ensure that there are no concurrent calls more than the limit.
185-
if stat.MaxConcurrentCalls > maxPendingHandshakes {
186-
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, maxPendingHandshakes)
186+
if stat.MaxConcurrentCalls > int(envconfig.ALTSMaxConcurrentHandshakes) {
187+
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, envconfig.ALTSMaxConcurrentHandshakes)
187188
}
188189
}
189190
}
@@ -194,7 +195,7 @@ func (s) TestServerHandshake(t *testing.T) {
194195
numberOfHandshakes int
195196
}{
196197
{0 * time.Millisecond, 1},
197-
{100 * time.Millisecond, 10 * maxPendingHandshakes},
198+
{100 * time.Millisecond, 10 * int(envconfig.ALTSMaxConcurrentHandshakes)},
198199
} {
199200
errc := make(chan error)
200201
stat.Reset()
@@ -239,8 +240,8 @@ func (s) TestServerHandshake(t *testing.T) {
239240
}
240241

241242
// Ensure that there are no concurrent calls more than the limit.
242-
if stat.MaxConcurrentCalls > maxPendingHandshakes {
243-
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, maxPendingHandshakes)
243+
if stat.MaxConcurrentCalls > int(envconfig.ALTSMaxConcurrentHandshakes) {
244+
t.Errorf("Observed %d concurrent handshakes; want <= %d", stat.MaxConcurrentCalls, envconfig.ALTSMaxConcurrentHandshakes)
244245
}
245246
}
246247
}

examples/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/envoyproxy/go-control-plane v0.11.1-0.20230524094728-9239064ad72f // indirect
2121
github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect
2222
golang.org/x/net v0.9.0 // indirect
23+
golang.org/x/sync v0.1.0 // indirect
2324
golang.org/x/sys v0.7.0 // indirect
2425
golang.org/x/text v0.9.0 // indirect
2526
google.golang.org/appengine v1.6.7 // indirect

examples/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,6 +1005,7 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ
10051005
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
10061006
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
10071007
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
1008+
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
10081009
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
10091010
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
10101011
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/google/uuid v1.3.0
1414
golang.org/x/net v0.9.0
1515
golang.org/x/oauth2 v0.7.0
16+
golang.org/x/sync v0.0.0-20190423024810-112230192c58
1617
golang.org/x/sys v0.7.0
1718
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19
1819
google.golang.org/protobuf v1.30.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
5858
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
5959
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6060
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
61+
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
6162
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6263
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
6364
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

internal/envconfig/envconfig.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ var (
4040
// pick_first LB policy, which can be enabled by setting the environment
4141
// variable "GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG" to "true".
4242
PickFirstLBConfig = boolFromEnv("GRPC_EXPERIMENTAL_PICKFIRST_LB_CONFIG", false)
43+
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
44+
// handshakes that can be performed.
45+
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
4346
)
4447

4548
func boolFromEnv(envVar string, def bool) bool {

0 commit comments

Comments
 (0)