-
Notifications
You must be signed in to change notification settings - Fork 4.5k
idle: decrement active call count for streaming RPCs only when the call completes #6610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
130dfcc
bd677f7
5f2e647
1aa48a5
fe8452c
c0ac1a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"strings" | ||
"testing" | ||
"time" | ||
|
@@ -179,80 +180,113 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { | |
// Tests the case where channel idleness is enabled by passing a small value for | ||
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY. | ||
func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) { | ||
// Create a ClientConn with a short idle_timeout. | ||
r := manual.NewBuilderWithScheme("whatever") | ||
dopts := []grpc.DialOption{ | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
grpc.WithResolvers(r), | ||
grpc.WithIdleTimeout(defaultTestShortIdleTimeout), | ||
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatalf("grpc.Dial() failed: %v", err) | ||
} | ||
t.Cleanup(func() { cc.Close() }) | ||
|
||
// Start a test backend which keeps a unary RPC call active by blocking on a | ||
// channel that is closed by the test later on. Also push an address update | ||
// via the resolver. | ||
blockCh := make(chan struct{}) | ||
backend := &stubserver.StubServer{ | ||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { | ||
<-blockCh | ||
return &testpb.Empty{}, nil | ||
tests := []struct { | ||
name string | ||
makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error | ||
}{ | ||
{ | ||
name: "unary", | ||
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error { | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
return fmt.Errorf("EmptyCall RPC failed: %v", err) | ||
} | ||
return nil | ||
}, | ||
}, | ||
{ | ||
name: "streaming", | ||
makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error { | ||
stream, err := client.FullDuplexCall(ctx) | ||
if err != nil { | ||
t.Fatalf("FullDuplexCall RPC failed: %v", err) | ||
} | ||
if _, err := stream.Recv(); err != nil && err != io.EOF { | ||
t.Fatalf("stream.Recv() failed: %v", err) | ||
} | ||
return nil | ||
}, | ||
}, | ||
} | ||
if err := backend.StartServer(); err != nil { | ||
t.Fatalf("Failed to start backend: %v", err) | ||
} | ||
t.Cleanup(backend.Stop) | ||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) | ||
|
||
// Verify that the ClientConn moves to READY. | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
testutils.AwaitState(ctx, t, cc, connectivity.Ready) | ||
|
||
// Spawn a goroutine which checks expected state transitions and idleness | ||
// channelz trace events. It eventually closes `blockCh`, thereby unblocking | ||
// the server RPC handler and the unary call below. | ||
errCh := make(chan error, 1) | ||
go func() { | ||
defer close(blockCh) | ||
// Verify that the ClientConn stays in READY. | ||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) | ||
defer sCancel() | ||
testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready) | ||
|
||
// Verify that there are no idleness related channelz events. | ||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { | ||
errCh <- err | ||
return | ||
} | ||
if err := channelzTraceEventNotFound(ctx, "exiting idle mode"); err != nil { | ||
errCh <- err | ||
return | ||
} | ||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
// Create a ClientConn with a short idle_timeout. | ||
r := manual.NewBuilderWithScheme("whatever") | ||
dopts := []grpc.DialOption{ | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
grpc.WithResolvers(r), | ||
grpc.WithIdleTimeout(defaultTestShortIdleTimeout), | ||
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), | ||
} | ||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) | ||
if err != nil { | ||
t.Fatalf("grpc.Dial() failed: %v", err) | ||
} | ||
t.Cleanup(func() { cc.Close() }) | ||
|
||
// Start a test backend which keeps a unary RPC call active by blocking on a | ||
// channel that is closed by the test later on. Also push an address update | ||
// via the resolver. | ||
blockCh := make(chan struct{}) | ||
backend := &stubserver.StubServer{ | ||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { | ||
<-blockCh | ||
return &testpb.Empty{}, nil | ||
}, | ||
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { | ||
<-blockCh | ||
return nil | ||
}, | ||
} | ||
if err := backend.StartServer(); err != nil { | ||
t.Fatalf("Failed to start backend: %v", err) | ||
} | ||
t.Cleanup(backend.Stop) | ||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) | ||
|
||
// Verify that the ClientConn moves to READY. | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
testutils.AwaitState(ctx, t, cc, connectivity.Ready) | ||
|
||
// Spawn a goroutine which checks expected state transitions and idleness | ||
// channelz trace events. | ||
errCh := make(chan error, 1) | ||
go func() { | ||
defer close(blockCh) | ||
|
||
// Verify that the ClientConn stays in READY. | ||
sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout) | ||
defer sCancel() | ||
if cc.WaitForStateChange(sCtx, connectivity.Ready) { | ||
errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState()) | ||
return | ||
} | ||
Comment on lines
+261
to
+264
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't you simplify this test a good amount, remove the errCh, and go back to the old There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we move the RPC to a goroutine and perform the rest of the checks here, there seems to be a race between how the RPC terminates:
So, I was having to do way too many checks in different places. The way it currently exists seems easier for error handling. |
||
|
||
// Unblock the unary RPC on the server. | ||
errCh <- nil | ||
}() | ||
// Verify that there are no idleness related channelz events. | ||
// | ||
// TODO: Improve the checks here. If these log strings are | ||
// changed in the code, these checks will continue to pass. | ||
if err := channelzTraceEventNotFound(ctx, "entering idle mode"); err != nil { | ||
errCh <- err | ||
return | ||
} | ||
errCh <- channelzTraceEventNotFound(ctx, "exiting idle mode") | ||
}() | ||
|
||
// Make a unary RPC that blocks on the server, thereby ensuring that the | ||
// count of active RPCs on the client is non-zero. | ||
client := testgrpc.NewTestServiceClient(cc) | ||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { | ||
t.Errorf("EmptyCall RPC failed: %v", err) | ||
} | ||
if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil { | ||
t.Fatalf("%s rpc failed: %v", test.name, err) | ||
} | ||
|
||
select { | ||
case err := <-errCh: | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
case <-ctx.Done(): | ||
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") | ||
select { | ||
case err := <-errCh: | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
case <-ctx.Done(): | ||
t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE") | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.