diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 1f785a52c59..8c6a06a873b 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -17,6 +17,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). ### Deprecations - Deprecated [V2 discovery](https://etcd.io/docs/v3.5/dev-internal/discovery_protocol/). +- Deprecated [SetKeepAlive and SetKeepAlivePeriod in limitListenerConn](https://github.com/etcd-io/etcd/pull/14356). - Removed [etcdctl defrag --data-dir](https://github.com/etcd-io/etcd/pull/13793). - Removed [etcdctl snapshot status](https://github.com/etcd-io/etcd/pull/13809). - Removed [etcdctl snapshot restore](https://github.com/etcd-io/etcd/pull/13809). diff --git a/client/pkg/transport/keepalive_listener.go b/client/pkg/transport/keepalive_listener.go index 4ff8e7f0010..8e863d2c9e3 100644 --- a/client/pkg/transport/keepalive_listener.go +++ b/client/pkg/transport/keepalive_listener.go @@ -21,26 +21,29 @@ import ( "time" ) -type keepAliveConn interface { - SetKeepAlive(bool) error - SetKeepAlivePeriod(d time.Duration) error -} - // NewKeepAliveListener returns a listener that listens on the given address. // Be careful when wrap around KeepAliveListener with another Listener if TLSInfo is not nil. // Some pkgs (like go/http) might expect Listener to return TLSConn type to start TLS handshake. // http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html +// +// Note(ahrtr): +// only `net.TCPConn` supports `SetKeepAlive` and `SetKeepAlivePeriod` +// by default, so if you want to wrap multiple layers of net.Listener, +// the `keepaliveListener` should be the one which is closest to the +// original `net.Listener` implementation, namely `TCPListener`. func NewKeepAliveListener(l net.Listener, scheme string, tlscfg *tls.Config) (net.Listener, error) { + kal := &keepaliveListener{ + Listener: l, + } + if scheme == "https" { if tlscfg == nil { return nil, fmt.Errorf("cannot listen on TLS for given listener: KeyFile and CertFile are not presented") } - return newTLSKeepaliveListener(l, tlscfg), nil + return newTLSKeepaliveListener(kal, tlscfg), nil } - return &keepaliveListener{ - Listener: l, - }, nil + return kal, nil } type keepaliveListener struct{ net.Listener } @@ -50,13 +53,43 @@ func (kln *keepaliveListener) Accept() (net.Conn, error) { if err != nil { return nil, err } - kac := c.(keepAliveConn) + + kac, err := createKeepaliveConn(c) + if err != nil { + return nil, fmt.Errorf("create keepalive connection failed, %w", err) + } // detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl // default on linux: 30 + 8 * 30 // default on osx: 30 + 8 * 75 - kac.SetKeepAlive(true) - kac.SetKeepAlivePeriod(30 * time.Second) - return c, nil + if err := kac.SetKeepAlive(true); err != nil { + return nil, fmt.Errorf("SetKeepAlive failed, %w", err) + } + if err := kac.SetKeepAlivePeriod(30 * time.Second); err != nil { + return nil, fmt.Errorf("SetKeepAlivePeriod failed, %w", err) + } + return kac, nil +} + +func createKeepaliveConn(c net.Conn) (*keepAliveConn, error) { + tcpc, ok := c.(*net.TCPConn) + if !ok { + return nil, ErrNotTCP + } + return &keepAliveConn{tcpc}, nil +} + +type keepAliveConn struct { + *net.TCPConn +} + +// SetKeepAlive sets keepalive +func (l *keepAliveConn) SetKeepAlive(doKeepAlive bool) error { + return l.TCPConn.SetKeepAlive(doKeepAlive) +} + +// SetKeepAlivePeriod sets keepalive period +func (l *keepAliveConn) SetKeepAlivePeriod(d time.Duration) error { + return l.TCPConn.SetKeepAlivePeriod(d) } // A tlsKeepaliveListener implements a network listener (net.Listener) for TLS connections. @@ -72,12 +105,7 @@ func (l *tlsKeepaliveListener) Accept() (c net.Conn, err error) { if err != nil { return } - kac := c.(keepAliveConn) - // detection time: tcp_keepalive_time + tcp_keepalive_probes + tcp_keepalive_intvl - // default on linux: 30 + 8 * 30 - // default on osx: 30 + 8 * 75 - kac.SetKeepAlive(true) - kac.SetKeepAlivePeriod(30 * time.Second) + c = tls.Server(c, l.config) return c, nil } diff --git a/client/pkg/transport/keepalive_listener_test.go b/client/pkg/transport/keepalive_listener_test.go index d5020544e4c..efe312d94a8 100644 --- a/client/pkg/transport/keepalive_listener_test.go +++ b/client/pkg/transport/keepalive_listener_test.go @@ -40,6 +40,9 @@ func TestNewKeepAliveListener(t *testing.T) { if err != nil { t.Fatalf("unexpected Accept error: %v", err) } + if _, ok := conn.(*keepAliveConn); !ok { + t.Fatalf("Unexpected conn type: %T, wanted *keepAliveConn", conn) + } conn.Close() ln.Close() diff --git a/client/pkg/transport/limit_listen.go b/client/pkg/transport/limit_listen.go index 930c542066f..404722ba76e 100644 --- a/client/pkg/transport/limit_listen.go +++ b/client/pkg/transport/limit_listen.go @@ -63,6 +63,9 @@ func (l *limitListenerConn) Close() error { return err } +// SetKeepAlive sets keepalive +// +// Deprecated: use (*keepAliveConn) SetKeepAlive instead. func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error { tcpc, ok := l.Conn.(*net.TCPConn) if !ok { @@ -71,6 +74,9 @@ func (l *limitListenerConn) SetKeepAlive(doKeepAlive bool) error { return tcpc.SetKeepAlive(doKeepAlive) } +// SetKeepAlivePeriod sets keepalive period +// +// Deprecated: use (*keepAliveConn) SetKeepAlivePeriod instead. func (l *limitListenerConn) SetKeepAlivePeriod(d time.Duration) error { tcpc, ok := l.Conn.(*net.TCPConn) if !ok { diff --git a/client/pkg/transport/listener.go b/client/pkg/transport/listener.go index 3659491d7c0..cbe3b3f891a 100644 --- a/client/pkg/transport/listener.go +++ b/client/pkg/transport/listener.go @@ -68,7 +68,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err fallthrough case lnOpts.IsTimeout(), lnOpts.IsSocketOpts(): // timeout listener with socket options. - ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) + ln, err := newKeepAliveListener(&lnOpts.ListenConfig, addr) if err != nil { return nil, err } @@ -78,7 +78,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err writeTimeout: lnOpts.writeTimeout, } case lnOpts.IsTimeout(): - ln, err := net.Listen("tcp", addr) + ln, err := newKeepAliveListener(nil, addr) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err writeTimeout: lnOpts.writeTimeout, } default: - ln, err := net.Listen("tcp", addr) + ln, err := newKeepAliveListener(nil, addr) if err != nil { return nil, err } @@ -102,6 +102,19 @@ func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, err return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener) } +func newKeepAliveListener(cfg *net.ListenConfig, addr string) (ln net.Listener, err error) { + if cfg != nil { + ln, err = cfg.Listen(context.TODO(), "tcp", addr) + } else { + ln, err = net.Listen("tcp", addr) + } + if err != nil { + return + } + + return NewKeepAliveListener(ln, "tcp", nil) +} + func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) { if scheme != "https" && scheme != "unixs" { return l, nil diff --git a/client/pkg/transport/listener_test.go b/client/pkg/transport/listener_test.go index ed64b7bf31d..11e2182fe38 100644 --- a/client/pkg/transport/listener_test.go +++ b/client/pkg/transport/listener_test.go @@ -205,6 +205,15 @@ func TestNewListenerWithSocketOpts(t *testing.T) { if !test.expectedErr && err != nil { t.Fatalf("unexpected error: %v", err) } + + if test.scheme == "http" { + lnOpts := newListenOpts(test.opts...) + if !lnOpts.IsSocketOpts() && !lnOpts.IsTimeout() { + if _, ok := ln.(*keepaliveListener); !ok { + t.Fatalf("ln: unexpected listener type: %T, wanted *keepaliveListener", ln) + } + } + } }) } } diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 3d94b63be1e..8a9ed897ff1 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -666,12 +666,6 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum)) } - if network == "tcp" { - if sctx.l, err = transport.NewKeepAliveListener(sctx.l, network, nil); err != nil { - return nil, err - } - } - defer func(u url.URL) { if err == nil { return diff --git a/tests/functional/agent/handler.go b/tests/functional/agent/handler.go index 057d539dc8c..6d6023064c4 100644 --- a/tests/functional/agent/handler.go +++ b/tests/functional/agent/handler.go @@ -125,7 +125,7 @@ func (srv *Server) createEtcd(fromSnapshot bool, failpoints string) error { func (srv *Server) runEtcd() error { errc := make(chan error) go func() { - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) // server advertise client/peer listener had to start first // before setting up proxy listener errc <- srv.startProxy() @@ -137,17 +137,19 @@ func (srv *Server) runEtcd() error { zap.String("command-path", srv.etcdCmd.Path), ) err := srv.etcdCmd.Start() - perr := <-errc + srv.lg.Info( "started etcd command", zap.String("command-path", srv.etcdCmd.Path), zap.Strings("command-args", srv.etcdCmd.Args), - zap.Errors("errors", []error{err, perr}), + zap.Strings("envs", srv.etcdCmd.Env), + zap.Error(err), ) if err != nil { return err } - return perr + + return <-errc } select { @@ -218,6 +220,11 @@ func (srv *Server) startProxy() error { return err } + srv.lg.Info("Checking client target's connectivity", zap.String("target", listenClientURL.Host)) + if err := checkTCPConnect(srv.lg, listenClientURL.Host); err != nil { + return fmt.Errorf("check client target failed, %w", err) + } + srv.lg.Info("starting proxy on client traffic", zap.String("url", advertiseClientURL.String())) srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{ Logger: srv.lg, @@ -226,6 +233,7 @@ func (srv *Server) startProxy() error { }) select { case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error(): + srv.lg.Info("starting client proxy failed", zap.Error(err)) return err case <-time.After(2 * time.Second): srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String())) @@ -242,6 +250,11 @@ func (srv *Server) startProxy() error { return err } + srv.lg.Info("Checking peer target's connectivity", zap.String("target", listenPeerURL.Host)) + if err := checkTCPConnect(srv.lg, listenPeerURL.Host); err != nil { + return fmt.Errorf("check peer target failed, %w", err) + } + srv.lg.Info("starting proxy on peer traffic", zap.String("url", advertisePeerURL.String())) srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{ Logger: srv.lg, @@ -250,6 +263,7 @@ func (srv *Server) startProxy() error { }) select { case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error(): + srv.lg.Info("starting peer proxy failed", zap.Error(err)) return err case <-time.After(2 * time.Second): srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String())) diff --git a/tests/functional/agent/utils.go b/tests/functional/agent/utils.go index 16931a2cbf9..98d88bd913a 100644 --- a/tests/functional/agent/utils.go +++ b/tests/functional/agent/utils.go @@ -126,6 +126,23 @@ func loadFileData(filePath string) ([]byte, error) { return data, nil } +func checkTCPConnect(lg *zap.Logger, target string) error { + for i := 0; i < 10; i++ { + if conn, err := net.Dial("tcp", target); err != nil { + lg.Error("The target isn't reachable", zap.Int("retries", i), zap.String("target", target), zap.Error(err)) + } else { + if conn != nil { + conn.Close() + lg.Info("The target is reachable", zap.Int("retries", i), zap.String("target", target)) + return nil + } + lg.Error("The target isn't reachable due to the returned conn is nil", zap.Int("retries", i), zap.String("target", target)) + } + time.Sleep(time.Second) + } + return fmt.Errorf("timed out waiting for the target (%s) to be reachable", target) +} + func cleanPageCache() error { // https://www.kernel.org/doc/Documentation/sysctl/vm.txt // https://github.com/torvalds/linux/blob/master/fs/drop_caches.c