Skip to content

xds/server: Fix xDS Server leak #7664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xds/internal/server/conn_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (c *connWrapper) Close() error {
if c.rootProvider != nil {
c.rootProvider.Close()
}
c.parent.removeConn(c)
return c.Conn.Close()
}

Expand Down
34 changes: 19 additions & 15 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
conns: make(map[*connWrapper]bool),

mode: connectivity.ServingModeNotServing,
closed: grpcsync.NewEvent(),
Expand Down Expand Up @@ -135,13 +136,13 @@

// mu guards access to the current serving mode and the active filter chain
// manager.
mu sync.RWMutex
mu sync.Mutex
// Current serving mode.
mode connectivity.ServingMode
// Filter chain manager currently serving.
activeFilterChainManager *xdsresource.FilterChainManager
// conns accepted with configuration from activeFilterChainManager.
conns []*connWrapper
conns map[*connWrapper]bool

// These fields are read/written to in the context of xDS updates, which are
// guaranteed to be emitted synchronously from the xDS Client. Thus, they do
Expand Down Expand Up @@ -202,17 +203,14 @@
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36
var connsToClose []*connWrapper
if l.activeFilterChainManager != nil { // If there is a filter chain manager to clean up.
connsToClose = l.conns
l.conns = nil
}
connsToClose := l.conns
l.conns = make(map[*connWrapper]bool)
l.activeFilterChainManager = l.pendingFilterChainManager
l.pendingFilterChainManager = nil
l.instantiateFilterChainRoutingConfigurationsLocked()
l.mu.Unlock()
go func() {
for _, conn := range connsToClose {
for conn := range connsToClose {
conn.Drain()
}
}()
Expand Down Expand Up @@ -304,15 +302,15 @@
return nil, fmt.Errorf("received connection with non-TCP address (local: %T, remote %T)", conn.LocalAddr(), conn.RemoteAddr())
}

l.mu.RLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this field still used as an RWMutex in other places? If not the type should change to a regular mutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok will switch.

l.mu.Lock()
if l.mode == connectivity.ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
// "not-serving". Closing the connection immediately upon accepting
// is one of the other ways to implement the "not-serving" mode as
// outlined in gRFC A36.
l.mu.RUnlock()
l.mu.Unlock()
conn.Close()
continue
}
Expand All @@ -324,7 +322,7 @@
SourcePort: srcAddr.Port,
})
if err != nil {
l.mu.RUnlock()
l.mu.Unlock()

Check warning on line 325 in xds/internal/server/listener_wrapper.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/server/listener_wrapper.go#L325

Added line #L325 was not covered by tests
// When a matching filter chain is not found, we close the
// connection right away, but do not return an error back to
// `grpc.Serve()` from where this Accept() was invoked. Returning an
Expand All @@ -341,12 +339,18 @@
continue
}
cw := &connWrapper{Conn: conn, filterChain: fc, parent: l, urc: fc.UsableRouteConfiguration}
l.conns = append(l.conns, cw)
l.mu.RUnlock()
l.conns[cw] = true
l.mu.Unlock()
return cw, nil
}
}

func (l *listenerWrapper) removeConn(conn *connWrapper) {
l.mu.Lock()
defer l.mu.Unlock()
delete(l.conns, conn)
}

// Close closes the underlying listener. It also cancels the xDS watch
// registered in Serve() and closes any certificate provider instances created
// based on security configuration received in the LDS response.
Expand Down Expand Up @@ -376,9 +380,9 @@
l.mode = newMode
if l.mode == connectivity.ServingModeNotServing {
connsToClose := l.conns
l.conns = nil
l.conns = make(map[*connWrapper]bool)
go func() {
for _, conn := range connsToClose {
for conn := range connsToClose {
conn.Drain()
}
}()
Expand Down
105 changes: 105 additions & 0 deletions xds/internal/server/listener_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ import (
"fmt"
"net"
"strconv"
"sync"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/xds/e2e"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
testpb "google.golang.org/grpc/interop/grpc_testing"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
Expand Down Expand Up @@ -151,5 +157,104 @@ func (s) TestListenerWrapper(t *testing.T) {
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeNotServing)
}
}
}

type testService struct {
testgrpc.TestServiceServer
}

func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}

// TestConnsCleanup tests that the listener wrapper clears it's connection
// references when connections close. It sets up a listener wrapper and gRPC
// Server, and connects to the server 100 times and makes an RPC each time, and
// then closes the connection. After these 100 connections Close, the listener
// wrapper should have no more references to any connections.
func (s) TestConnsCleanup(t *testing.T) {
mgmtServer, nodeID, _, _, xdsC := xdsSetupForTests(t)
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("Failed to create a local TCP listener: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

modeCh := make(chan connectivity.ServingMode, 1)
vm := verifyMode{
modeCh: modeCh,
}

host, port := hostPortFromListener(t, lis)
lisResourceName := fmt.Sprintf(e2e.ServerListenerResourceNameTemplate, net.JoinHostPort(host, strconv.Itoa(int(port))))
params := ListenerWrapperParams{
Listener: lis,
ListenerResourceName: lisResourceName,
XDSClient: xdsC,
ModeCallback: vm.verifyModeCallback,
}
lw := NewListenerWrapper(params)
if lw == nil {
t.Fatalf("NewListenerWrapper(%+v) returned nil", params)
}
defer lw.Close()

resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, route1)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Wait for Listener Mode to go serving.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for mode change")
case mode := <-modeCh:
if mode != connectivity.ServingModeServing {
t.Fatalf("mode change received: %v, want: %v", mode, connectivity.ServingModeServing)
}
}

server := grpc.NewServer(grpc.Creds(insecure.NewCredentials()))
testgrpc.RegisterTestServiceServer(server, &testService{})
wg := sync.WaitGroup{}
go func() {
if err := server.Serve(lw); err != nil {
t.Errorf("failed to serve: %v", err)
}
}()

// Make 100 connections to the server, and make an RPC on each one.
for i := 0; i < 100; i++ {
cc, err := grpc.NewClient(lw.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient failed with err: %v", err)
}
client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("client.EmptyCall() failed: %v", err)
}
cc.Close()
}

lisWrapper := lw.(*listenerWrapper)
// Eventually when the server processes the connection shutdowns, the
// listener wrapper should clear its references to the wrapped connections.
lenConns := 1
for ; ctx.Err() == nil && lenConns > 0; <-time.After(time.Millisecond) {
lisWrapper.mu.Lock()
lenConns = len(lisWrapper.conns)
lisWrapper.mu.Unlock()
}
if lenConns > 0 {
t.Fatalf("timeout waiting for lis wrapper conns to clear, size: %v", lenConns)
}

server.Stop()
wg.Wait()
}