diff --git a/server/config/config.go b/server/config/config.go index 53e888c8d33..4c9e95ad06d 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -120,6 +120,10 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 + WarningApplyDuration time.Duration StrictReconfigCheck bool diff --git a/server/embed/config.go b/server/embed/config.go index bb81181f31b..60ff1b66243 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -17,6 +17,7 @@ package embed import ( "fmt" "io/ioutil" + "math" "net" "net/http" "net/url" @@ -56,6 +57,7 @@ const ( DefaultMaxTxnOps = uint(128) DefaultWarningApplyDuration = 100 * time.Millisecond DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultMaxConcurrentStreams = math.MaxUint32 DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second @@ -199,6 +201,10 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` + LPUrls, LCUrls []url.URL APUrls, ACUrls []url.URL ClientTLSInfo transport.TLSInfo @@ -306,7 +312,7 @@ type Config struct { AuthToken string `json:"auth-token"` BcryptCost uint `json:"bcrypt-cost"` - //The AuthTokenTTL in seconds of the simple token + // AuthTokenTTL specifies the TTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` @@ -451,6 +457,7 @@ func NewConfig() *Config { MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + MaxConcurrentStreams: DefaultMaxConcurrentStreams, ExperimentalWarningApplyDuration: DefaultWarningApplyDuration, GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime, diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 02fc93c70aa..2e5f68afafc 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -199,6 +199,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, SocketOpts: cfg.SocketOpts, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, @@ -337,7 +338,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), zap.String("initial-cluster-state", ec.ClusterState), zap.String("initial-cluster-token", sc.InitialClusterToken), - zap.Int64("quota-size-bytes", quota), + zap.Int64("quota-backend-bytes", quota), + zap.Uint("max-request-bytes", sc.MaxRequestBytes), + zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams), + zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), diff --git a/server/embed/serve.go b/server/embed/serve.go index c3e786321cd..579e22a369f 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election" @@ -43,6 +44,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" + "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -133,6 +135,10 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } + if err := configureHttpServer(srvhttp, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() @@ -182,6 +188,10 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err + } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -195,6 +205,13 @@ func (sctx *serveCtx) serve( return m.Serve() } +func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { + // todo (ahrtr): should we support configuring other parameters in the future as well? + return http2.ConfigureServer(srv, &http2.Server{ + MaxConcurrentStreams: cfg.MaxConcurrentStreams, + }) +} + // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC // connections or otherHandler otherwise. Given in gRPC docs. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 26db0e67d64..195ef1cb147 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -172,6 +172,8 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") + // raft connection timeouts fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") @@ -398,6 +400,8 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites") + cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams") + cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.ClusterState = cfg.cf.clusterState.String() diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index f832ca5afac..15c0f14c0b4 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -47,6 +47,7 @@ import ( "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -95,6 +96,8 @@ var ( grpcKeepAliveMinTime time.Duration grpcKeepAliveTimeout time.Duration grpcKeepAliveInterval time.Duration + + maxConcurrentStreams uint32 ) const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024 @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.") + return &cmd } @@ -212,6 +217,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + lg.Fatal("Failed to configure the http server", zap.Error(err)) + } + errc := make(chan error, 3) go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index 56ca5285c46..46c4487d396 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -80,6 +80,8 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. + --max-concurrent-streams 'math.MaxUint32' + Maximum concurrent streams that each client can open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index ea3dd75705f..349ebea4007 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -32,7 +32,6 @@ import ( const ( grpcOverheadBytes = 512 * 1024 - maxStreams = math.MaxUint32 maxSendBytes = math.MaxInt32 ) @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) - opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) + opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) grpcServer := grpc.NewServer(append(opts, gopts...)...)