Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: serving mode changes outlined in gRFC A36 #4328

Merged
merged 9 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
xds: serving mode changes outlined in gRFC A36
  • Loading branch information
easwars committed Apr 26, 2021
commit d9a4551995e8719fd088571a9e62795e03fd3541
2 changes: 1 addition & 1 deletion examples/features/xds/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
}
}

greeterServer := xds.NewGRPCServer(grpc.Creds(creds))
greeterServer := xds.NewGRPCServer([]grpc.ServerOption{grpc.Creds(creds)}, nil)
pb.RegisterGreeterServer(greeterServer, &server{serverName: determineHostname()})

healthPort := fmt.Sprintf(":%d", *port+1)
Expand Down
7 changes: 6 additions & 1 deletion internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ var (
// GetServerCredentials returns the transport credentials configured on a
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentialsa
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
79 changes: 59 additions & 20 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,31 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32

// Server transports are tracked in a map which is keyed on listener
// address. For regular gRPC traffic, connections are accepted in Serve()
// through a call to Accept(), and we use the actual listener address as key
// when we add it to the map. But for connections received through
// ServeHTTP(), we do not have a listener and hence use this dummy value.
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)

func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds
}
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.mu.Lock()
for a, conns := range srv.conns {
if a != addr {
continue
}
for st := range conns {
st.Drain()
}
}
srv.mu.Unlock()
}
}

var statusOK = status.New(codes.OK, "")
Expand Down Expand Up @@ -107,9 +126,12 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions

mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
// conns contains all active server transports. It is a map keyed on a
// listener address with the value being the set of active transports
// belonging to that listener.
conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
Expand Down Expand Up @@ -519,7 +541,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
Expand Down Expand Up @@ -778,15 +800,15 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}

// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
Expand Down Expand Up @@ -814,12 +836,12 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}

rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
s.removeConn(lisAddr, st)
}()
}

Expand Down Expand Up @@ -924,10 +946,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
if !s.addConn(listenerAddressForServeHTTP, st) {
return
}
defer s.removeConn(st)
defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st)
}

Expand Down Expand Up @@ -955,7 +977,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}

func (s *Server) addConn(st transport.ServerTransport) bool {
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
Expand All @@ -967,15 +989,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
s.conns[st] = true

if s.conns[addr] == nil {
// Create a map entry if this is the first connection on this listener.
s.conns[addr] = make(map[transport.ServerTransport]bool)
}
s.conns[addr][st] = true
return true
}

func (s *Server) removeConn(st transport.ServerTransport) {
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, st)

conns := s.conns[addr]
if conns != nil {
delete(conns, st)
if len(conns) == 0 {
// If the last connection for this address is being removed, also
// remove the map entry corresponding to the address. This is used
// in GracefulStop() when waiting for all connections to be closed.
delete(s.conns, addr)
}
s.cv.Broadcast()
}
}
Expand Down Expand Up @@ -1639,7 +1674,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
st := s.conns
conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
Expand All @@ -1648,8 +1683,10 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
for c := range st {
c.Close()
for _, cs := range conns {
for st := range cs {
st.Close()
}
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
Expand Down Expand Up @@ -1686,8 +1723,10 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
for st := range s.conns {
st.Drain()
for _, conns := range s.conns {
for st := range conns {
st.Drain()
}
}
s.drain = true
}
Expand Down
107 changes: 85 additions & 22 deletions xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ var (
backoffFunc = bs.Backoff
)

// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int

const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)

func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}

// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)

func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
}
Expand All @@ -70,6 +106,8 @@ type ListenerWrapperParams struct {
XDSCredsInUse bool
// XDSClient provides the functionality from the xdsClient required here.
XDSClient XDSClientInterface
// ModeCallback is the callback to invoke when the serving mode changes.
ModeCallback ServingModeCallback
}

// NewListenerWrapper creates a new listenerWrapper with params. It returns a
Expand All @@ -83,6 +121,7 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
name: params.ListenerResourceName,
xdsCredsInUse: params.XDSCredsInUse,
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),

closed: grpcsync.NewEvent(),
Expand Down Expand Up @@ -111,12 +150,11 @@ type listenerWrapper struct {
net.Listener
logger *internalgrpclog.PrefixLogger

// TODO: Maintain serving state of this listener.

name string
xdsCredsInUse bool
xdsC XDSClientInterface
cancelWatch func()
modeCallback ServingModeCallback

// Set to true if the listener is bound to the IP_ANY address (which is
// "0.0.0.0" for IPv4 and "::" for IPv6).
Expand All @@ -138,11 +176,14 @@ type listenerWrapper struct {
// updates received in the callback if this event has fired.
closed *grpcsync.Event

// Filter chains received as part of the last good update. The reason for
// using an rw lock here is that this field will be read by all connections
// during their server-side handshake (in the hot path), but writes to this
// happen rarely (when we get a Listener resource update).
mu sync.RWMutex
// mu guards access to the current serving mode and the filter chains. The
// reason for using an rw lock here is that these fields are read in
// Accept() for all incoming connections, but writes happen rarely (when we
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
}

Expand Down Expand Up @@ -175,8 +216,6 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
// Reset retries after a successful Accept().
retries = 0

// TODO: Close connections if in "non-serving" state

// Since the net.Conn represents an incoming connection, the source and
// destination address can be retrieved from the local address and
// remote address of the net.Conn respectively.
Expand All @@ -191,6 +230,17 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}

l.mu.RLock()
if l.mode == ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
// "not-serving". Closing the connection immediately upon accepting
// is one of the other ways to implement the "not-serving" mode as
// outlined in gRFC A36.
l.mu.RUnlock()
conn.Close()
continue
}
fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{
IsUnspecifiedListener: l.isUnspecifiedAddr,
DestAddr: destAddr.IP,
Expand Down Expand Up @@ -236,14 +286,13 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
return
}

// TODO: Handle resource-not-found errors by moving to not-serving state.
if err != nil {
// We simply log an error here and hope we get a successful update
// in the future. The error could be because of a timeout or an
// actual error, like the requested resource not found. In any case,
// it is fine for the server to hang indefinitely until Stop() is
// called.
l.logger.Warningf("Received error for resource %q: %+v", l.name, err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
l.logger.Infof("Received update for resource %q: %+v", l.name, update)
Expand All @@ -258,18 +307,32 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
// appropriate context to perform this check.
//
// What this means is that the xdsClient has ACKed a resource which can push
// the server into a "not serving" state. This is not ideal, but this is
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
// TODO: Switch to "not serving" if the host:port does not match.
l.logger.Warningf("Received host:port (%s:%d) in Listener update does not match local listening address: (%s:%s", ilc.Address, ilc.Port, l.addr, l.port)
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}

l.mu.Lock()
l.filterChains = ilc.FilterChains
l.mu.Unlock()
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
// TODO: Move to serving state on receipt of a good response.
}

func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

l.filterChains = fcs
if l.mode == newMode {
// Since we cannot guarantee that the control plane does not send the
// same response again and again, we don't want to invoke the registered
// callback when the mode has not changed.
return
}
l.mode = newMode
if l.modeCallback != nil {
l.modeCallback(l.Listener.Addr(), newMode, err)
}
l.logger.Warningf("Listener %q entering mode: %q", l.Addr(), newMode)
}
3 changes: 2 additions & 1 deletion xds/internal/test/xds_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

const (
defaultTestTimeout = 10 * time.Second
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
Expand Down
Loading