Skip to content

Commit 8fc625b

Browse files
committed
leave Stop as not blocking on handlers by default for backward compatibility
1 parent c61a4d7 commit 8fc625b

File tree

4 files changed

+105
-17
lines changed

4 files changed

+105
-17
lines changed

server.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ type Server struct {
139139
quit *grpcsync.Event
140140
done *grpcsync.Event
141141
channelzRemoveOnce sync.Once
142-
serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
142+
serveWG sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
143+
handlersWG sync.WaitGroup // counts active method handler goroutines
143144

144145
channelzID *channelz.Identifier
145146
czData *channelzData
@@ -176,6 +177,7 @@ type serverOptions struct {
176177
headerTableSize *uint32
177178
numServerWorkers uint32
178179
recvBufferPool SharedBufferPool
180+
waitForHandlers bool
179181
}
180182

181183
var defaultServerOptions = serverOptions{
@@ -573,6 +575,21 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
573575
})
574576
}
575577

578+
// WaitForHandlers cause Stop to wait until all outstanding method handlers have
579+
// exited before returning. If false, Stop will return as soon as all
580+
// connections have closed, but method handlers may still be running. By
581+
// default, Stop does not wait for method handlers to return.
582+
//
583+
// # Experimental
584+
//
585+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
586+
// later release.
587+
func WaitForHandlers(w bool) ServerOption {
588+
return newFuncServerOption(func(o *serverOptions) {
589+
o.waitForHandlers = w
590+
})
591+
}
592+
576593
// RecvBufferPool returns a ServerOption that configures the server
577594
// to use the provided shared buffer pool for parsing incoming messages. Depending
578595
// on the application's workload, this could result in reduced memory allocation.
@@ -1009,13 +1026,12 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,
10091026
}()
10101027

10111028
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
1012-
wg := &sync.WaitGroup{}
10131029
st.HandleStreams(ctx, func(stream *transport.Stream) {
1014-
wg.Add(1)
1030+
s.handlersWG.Add(1)
10151031
streamQuota.acquire()
10161032
f := func() {
10171033
defer streamQuota.release()
1018-
defer wg.Done()
1034+
defer s.handlersWG.Done()
10191035
s.handleStream(st, stream)
10201036
}
10211037

@@ -1029,7 +1045,6 @@ func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport,
10291045
}
10301046
go f()
10311047
})
1032-
wg.Wait()
10331048
}
10341049

10351050
var _ http.Handler = (*Server)(nil)
@@ -1915,6 +1930,10 @@ func (s *Server) stop(graceful bool) {
19151930
s.serverWorkerChannelClose()
19161931
}
19171932

1933+
if graceful || s.opts.waitForHandlers {
1934+
s.handlersWG.Wait()
1935+
}
1936+
19181937
if s.events != nil {
19191938
s.events.Finish()
19201939
s.events = nil

server_ext_test.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,9 @@ func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
186186
ss.S.GracefulStop()
187187
}
188188

189-
func (s) TestHandlersReturnBeforeStop(t *testing.T) {
189+
// Tests the WaitForHandlers ServerOption by leaving an RPC running while Stop
190+
// is called, and ensures Stop doesn't return until the handler returns.
191+
func (s) TestServer_WaitForHandlers(t *testing.T) {
190192
started := grpcsync.NewEvent()
191193
blockCalls := grpcsync.NewEvent()
192194

@@ -199,7 +201,7 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) {
199201
return nil
200202
},
201203
}
202-
if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
204+
if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
203205
t.Fatal("Error starting server:", err)
204206
}
205207
defer ss.Stop()
@@ -255,3 +257,76 @@ func (s) TestHandlersReturnBeforeStop(t *testing.T) {
255257
t.Fatalf("Timed out waiting for second RPC to start on server.")
256258
}
257259
}
260+
261+
// Tests that GracefulStop will wait for all method handlers to return by
262+
// blocking a handler and ensuring GracefulStop doesn't return until after it is
263+
// unblocked.
264+
func (s) TestServer_GracefulStopWaits(t *testing.T) {
265+
started := grpcsync.NewEvent()
266+
blockCalls := grpcsync.NewEvent()
267+
268+
// This stub server does not properly respect the stream context, so it will
269+
// not exit when the context is canceled.
270+
ss := stubserver.StubServer{
271+
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
272+
started.Fire()
273+
<-blockCalls.Done()
274+
return nil
275+
},
276+
}
277+
if err := ss.Start(nil); err != nil {
278+
t.Fatal("Error starting server:", err)
279+
}
280+
defer ss.Stop()
281+
282+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
283+
defer cancel()
284+
285+
// Start one RPC to the server.
286+
ctx1, cancel1 := context.WithCancel(ctx)
287+
_, err := ss.Client.FullDuplexCall(ctx1)
288+
if err != nil {
289+
t.Fatal("Error staring call:", err)
290+
}
291+
292+
// Wait for the handler to be invoked.
293+
select {
294+
case <-started.Done():
295+
case <-ctx.Done():
296+
t.Fatalf("Timed out waiting for RPC to start on server.")
297+
}
298+
299+
// Cancel it on the client. The server handler will still be running.
300+
cancel1()
301+
302+
// Close the connection. This might be sufficient to allow the server to
303+
// return if it doesn't properly wait for outstanding method handlers to
304+
// return.
305+
ss.CC.Close()
306+
307+
// Try to Stop() the server, which should block indefinitely (until
308+
// blockCalls is fired).
309+
stopped := grpcsync.NewEvent()
310+
go func() {
311+
ss.S.GracefulStop()
312+
stopped.Fire()
313+
}()
314+
315+
// Wait 100ms and ensure stopped does not fire.
316+
select {
317+
case <-stopped.Done():
318+
trace := make([]byte, 4096)
319+
trace = trace[0:runtime.Stack(trace, true)]
320+
blockCalls.Fire()
321+
t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
322+
case <-time.After(100 * time.Millisecond):
323+
// Success; unblock the call and wait for stopped.
324+
blockCalls.Fire()
325+
}
326+
327+
select {
328+
case <-stopped.Done():
329+
case <-ctx.Done():
330+
t.Fatalf("Timed out waiting for second RPC to start on server.")
331+
}
332+
}

test/end2end_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,7 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
10351035
// connection for the RPC to go out on initially, and that the TCP connection will shut down strictly after
10361036
// the RPC has been started on it.
10371037
<-rpcStartedOnServer
1038-
go ss.S.Stop()
1038+
ss.S.Stop()
10391039
// The precise behavior of this test is subject to raceyness around the timing
10401040
// of when TCP packets are sent from client to server, and when we tell the
10411041
// server to stop, so we need to account for both possible error messages.

test/gracefulstop_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"google.golang.org/grpc"
3131
"google.golang.org/grpc/codes"
3232
"google.golang.org/grpc/credentials/insecure"
33-
"google.golang.org/grpc/internal/grpcsync"
3433
"google.golang.org/grpc/internal/stubserver"
3534
"google.golang.org/grpc/status"
3635

@@ -270,7 +269,7 @@ func (s) TestGracefulStopBlocksUntilGRPCConnectionsTerminate(t *testing.T) {
270269
// TestStopAbortsBlockingGRPCCall ensures that when Stop() is called while an ongoing RPC
271270
// is blocking that:
272271
// - Stop() returns
273-
// - and the RPC fails with an connection closed error on the client-side
272+
// - and the RPC fails with an connection closed error on the client-side
274273
func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
275274
unblockGRPCCall := make(chan struct{})
276275
grpcCallExecuting := make(chan struct{})
@@ -299,13 +298,8 @@ func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
299298
}()
300299

301300
<-grpcCallExecuting
302-
stopReturned := grpcsync.NewEvent()
303-
go func() {
304-
ss.S.Stop()
305-
stopReturned.Fire()
306-
}()
301+
ss.S.Stop()
307302

308-
<-grpcClientCallReturned
309303
unblockGRPCCall <- struct{}{}
310-
<-stopReturned.Done()
304+
<-grpcClientCallReturned
311305
}

0 commit comments

Comments
 (0)