Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

https server will not be impacted by the MaxConcurrentStreams settin… #14081

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (

require (
cloud.google.com/go v0.81.0 // indirect
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand Down Expand Up @@ -68,7 +69,7 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.7.0 // indirect
github.com/stretchr/testify v1.7.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.26.1 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -297,8 +298,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8=
Expand Down
50 changes: 50 additions & 0 deletions pkg/flags/int.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"fmt"
"strconv"
)

type uint32Value uint32

func NewUint32Value(s string) *uint32Value {
var u uint32Value
if s == "" || s == "0" {
return &u
}
if err := u.Set(s); err != nil {
panic(fmt.Sprintf("new uint32Value should never fail: %v", err))
}
return &u
}

func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}
func (i *uint32Value) Type() string {
return "uint32"
}
func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}
57 changes: 57 additions & 0 deletions pkg/flags/int_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"reflect"
"testing"
)

func TestInvalidUint32(t *testing.T) {
tests := []string{
// string
"invalid",
// negative number
"-1",
// float number
"0.1",
"-0.2",
// larger than math.MaxUint32
"4294967296",
}
for i, in := range tests {
var u uint32Value
if err := u.Set(in); err == nil {
t.Errorf(`#%d: unexpected nil error for in=%q`, i, in)
}
}
}

func TestUint32Value(t *testing.T) {
tests := []struct {
s string
exp uint32
}{
{s: "0", exp: 0},
{s: "1", exp: 1},
{s: "", exp: 0},
}
for i := range tests {
ss := uint32(*NewUint32Value(tests[i].s))
if !reflect.DeepEqual(tests[i].exp, ss) {
t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss)
}
}
}
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32

WarningApplyDuration time.Duration
WarningUnaryRequestDuration time.Duration

Expand Down
9 changes: 8 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -59,6 +60,7 @@ const (
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultWarningUnaryRequestDuration = 300 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -205,6 +207,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -311,7 +317,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`

//The AuthTokenTTL in seconds of the simple token
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
Expand Down Expand Up @@ -462,6 +468,7 @@ func NewConfig() *Config {

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
Expand Down
14 changes: 14 additions & 0 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/tmc/grpc-websocket-proxy/wsproxy"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -142,6 +143,9 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

Expand Down Expand Up @@ -191,6 +195,9 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
ahrtr marked this conversation as resolved.
Show resolved Hide resolved
go func() { errHandler(srv.Serve(tlsl)) }()

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -204,6 +211,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}

// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server
func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error {
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
})
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func newConfig() *config {
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")

fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.")

// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),
Expand Down Expand Up @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error {

cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")

cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")

cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")

cfg.ec.ClusterState = cfg.cf.clusterState.String()
Expand Down
11 changes: 11 additions & 0 deletions server/etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -95,6 +96,8 @@ var (
grpcKeepAliveMinTime time.Duration
grpcKeepAliveTimeout time.Duration
grpcKeepAliveInterval time.Duration

maxConcurrentStreams uint32
)

const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
Expand Down Expand Up @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command {

cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")

cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client may have open at a time.")

return &cmd
}

Expand Down Expand Up @@ -212,6 +217,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)

srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
panic(err)
}

errc := make(chan error, 3)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams '0'
Maximum concurrent streams that each client may have open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Expand Down
3 changes: 1 addition & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

grpcServer := grpc.NewServer(append(opts, gopts...)...)

Expand Down
Loading