From b84a7585e02355f713e75b7558691f82cb64cf31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E9=87=91=E7=8F=8F?= Date: Mon, 30 May 2022 17:03:27 +0800 Subject: [PATCH] support custom `grpc.MaxConcurrentStreams` There is no update on the original PR (see below) for more then 2 weeks. So Benjamin(@ahrtr) continue to work on the PR. The first step is to rebase the PR, because there are lots of conflicts with the main branch. ``` https://github.com/etcd-io/etcd/pull/14081 ``` Signed-off-by: nic-chen Signed-off-by: Benjamin Wang --- go.mod | 1 + go.sum | 3 +- pkg/flags/int.go | 50 +++++++++++++++ pkg/flags/int_test.go | 57 +++++++++++++++++ server/config/config.go | 4 ++ server/embed/config.go | 9 ++- server/embed/etcd.go | 1 + server/embed/serve.go | 14 ++++ server/etcdmain/config.go | 4 ++ server/etcdmain/grpc_proxy.go | 11 ++++ server/etcdmain/help.go | 2 + server/etcdserver/api/v3rpc/grpc.go | 3 +- tests/e2e/v3_curl_test.go | 95 ++++++++++++++++++++++++++-- tests/framework/e2e/cluster.go | 34 ++++++---- tests/framework/e2e/cluster_proxy.go | 5 ++ tests/framework/e2e/etcd_process.go | 2 + 16 files changed, 273 insertions(+), 22 deletions(-) create mode 100644 pkg/flags/int.go create mode 100644 pkg/flags/int_test.go diff --git a/go.mod b/go.mod index 214070e3ca81..7b7e7b6ff234 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( require ( cloud.google.com/go v0.81.0 // indirect + github.com/BurntSushi/toml v1.1.0 // indirect github.com/VividCortex/ewma v1.1.1 // indirect github.com/benbjohnson/clock v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index a122df246761..04c3a8649e9b 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,9 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= +github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM= diff --git a/pkg/flags/int.go b/pkg/flags/int.go new file mode 100644 index 000000000000..c3d763da44f5 --- /dev/null +++ b/pkg/flags/int.go @@ -0,0 +1,50 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "fmt" + "strconv" +) + +type uint32Value uint32 + +func NewUint32Value(s string) *uint32Value { + var u uint32Value + if s == "" || s == "0" { + return &u + } + if err := u.Set(s); err != nil { + panic(fmt.Sprintf("new uint32Value should never fail: %v", err)) + } + return &u +} + +func (i *uint32Value) Set(s string) error { + v, err := strconv.ParseUint(s, 0, 32) + *i = uint32Value(v) + return err +} +func (i *uint32Value) Type() string { + return "uint32" +} +func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) } + +// Uint32FromFlag return the uint32 value of a flag with the given name +func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 { + val := *fs.Lookup(name).Value.(*uint32Value) + return uint32(val) +} diff --git a/pkg/flags/int_test.go b/pkg/flags/int_test.go new file mode 100644 index 000000000000..2ebabd01f711 --- /dev/null +++ b/pkg/flags/int_test.go @@ -0,0 +1,57 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "reflect" + "testing" +) + +func TestInvalidUint32(t *testing.T) { + tests := []string{ + // string + "invalid", + // negative number + "-1", + // float number + "0.1", + "-0.2", + // larger than math.MaxUint32 + "4294967296", + } + for i, in := range tests { + var u uint32Value + if err := u.Set(in); err == nil { + t.Errorf(`#%d: unexpected nil error for in=%q`, i, in) + } + } +} + +func TestUint32Value(t *testing.T) { + tests := []struct { + s string + exp uint32 + }{ + {s: "0", exp: 0}, + {s: "1", exp: 1}, + {s: "", exp: 0}, + } + for i := range tests { + ss := uint32(*NewUint32Value(tests[i].s)) + if !reflect.DeepEqual(tests[i].exp, ss) { + t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss) + } + } +} diff --git a/server/config/config.go b/server/config/config.go index 75d7df6c4a68..625f019e4015 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,6 +129,10 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint + // MaxConcurrentStreams optionally specifies the number of concurrent + // streams that each client may have open at a time. + MaxConcurrentStreams uint32 + WarningApplyDuration time.Duration WarningUnaryRequestDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 2cacf5ea4f6d..21d38ca84541 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -17,6 +17,7 @@ package embed import ( "errors" "fmt" + "math" "net" "net/http" "net/url" @@ -59,6 +60,7 @@ const ( DefaultWarningApplyDuration = 100 * time.Millisecond DefaultWarningUnaryRequestDuration = 300 * time.Millisecond DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultMaxConcurrentStreams = math.MaxUint32 DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second @@ -205,6 +207,10 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // MaxConcurrentStreams optionally specifies the number of concurrent + // streams that each client may have open at a time. + MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` + LPUrls, LCUrls []url.URL APUrls, ACUrls []url.URL ClientTLSInfo transport.TLSInfo @@ -311,7 +317,7 @@ type Config struct { AuthToken string `json:"auth-token"` BcryptCost uint `json:"bcrypt-cost"` - //The AuthTokenTTL in seconds of the simple token + // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` @@ -462,6 +468,7 @@ func NewConfig() *Config { MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + MaxConcurrentStreams: DefaultMaxConcurrentStreams, ExperimentalWarningApplyDuration: DefaultWarningApplyDuration, ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration, diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 766336ccb04b..b557e5d3e1ad 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, SocketOpts: cfg.SocketOpts, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, diff --git a/server/embed/serve.go b/server/embed/serve.go index cb3d1c72d9df..5e162f3c8a8f 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -44,6 +44,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" + "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -148,6 +149,9 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } + if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil { + return err + } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() @@ -197,6 +201,9 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } + if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil { + return err + } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -209,6 +216,13 @@ func (sctx *serveCtx) serve( return m.Serve() } +// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server +func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error { + return http2.ConfigureServer(srv, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }) +} + // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC // connections or otherHandler otherwise. Given in gRPC docs. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 8091589dfcea..88f5b73ecead 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -142,6 +142,8 @@ func newConfig() *config { fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") + fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.") + // clustering fs.Var( flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""), @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites") + cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams") + cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.ClusterState = cfg.cf.clusterState.String() diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 55cc96d0cf9f..63f4d2e1f189 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -47,6 +47,7 @@ import ( "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -95,6 +96,8 @@ var ( grpcKeepAliveMinTime time.Duration grpcKeepAliveTimeout time.Duration grpcKeepAliveInterval time.Duration + + maxConcurrentStreams uint32 ) const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024 @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client may have open at a time.") + return &cmd } @@ -212,6 +217,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + panic(err) + } + errc := make(chan error, 3) go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index efa2a77e8e08..3483e96bfd05 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -81,6 +81,8 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. + --max-concurrent-streams '0' + Maximum concurrent streams that each client may have open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index ea3dd75705fd..349ebea40074 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -32,7 +32,6 @@ import ( const ( grpcOverheadBytes = 512 * 1024 - maxStreams = math.MaxUint32 maxSendBytes = math.MaxInt32 ) @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) - opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) + opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) grpcServer := grpc.NewServer(append(opts, gopts...)...) diff --git a/tests/e2e/v3_curl_test.go b/tests/e2e/v3_curl_test.go index 35450bbe418c..367f198622fc 100644 --- a/tests/e2e/v3_curl_test.go +++ b/tests/e2e/v3_curl_test.go @@ -21,7 +21,13 @@ import ( "math/rand" "path" "strconv" + "sync" + "sync/atomic" "testing" + "time" + + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/api/v3/authpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb" @@ -29,11 +35,13 @@ import ( "go.etcd.io/etcd/client/pkg/v3/testutil" epb "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb" "go.etcd.io/etcd/tests/v3/framework/e2e" - - "github.com/grpc-ecosystem/grpc-gateway/runtime" ) -var apiPrefix = []string{"/v3"} +var ( + apiPrefix = []string{"/v3"} + maxStreams uint32 = 6 + exceededStreams uint32 +) func TestV3CurlPutGetNoTLS(t *testing.T) { for _, p := range apiPrefix { @@ -60,6 +68,24 @@ func TestV3CurlPutGetClientTLS(t *testing.T) { testCtl(t, testV3CurlPutGet, withApiPrefix(p), withCfg(*e2e.NewConfigClientTLS())) } } +func TestV3CurlWatchConcurrent(t *testing.T) { + cfg := *e2e.NewConfigNoTLS() + cfg.ClusterSize = 1 + cfg.MaxConcurrentStreams = maxStreams + exceededStreams = 1 + for _, p := range apiPrefix { + testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg)) + } +} +func TestV3CurlWatchConcurrentClientTLS(t *testing.T) { + cfg := *e2e.NewConfigClientTLS() + cfg.ClusterSize = 1 + exceededStreams = 0 + cfg.MaxConcurrentStreams = maxStreams + for _, p := range apiPrefix { + testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg)) + } +} func TestV3CurlWatch(t *testing.T) { for _, p := range apiPrefix { testCtl(t, testV3CurlWatch, withApiPrefix(p)) @@ -145,6 +171,66 @@ func testV3CurlWatch(cx ctlCtx) { } } +func testV3CurlWatchConcurrent(cx ctlCtx) { + // store "bar" into "foo" + putreq, err := json.Marshal(&pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + cx.t.Fatal(err) + } + // watch for first update to "foo" + wcr := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1} + wreq, err := json.Marshal(wcr) + if err != nil { + cx.t.Fatal(err) + } + // marshaling the grpc to json gives: + // "{"RequestUnion":{"CreateRequest":{"key":"Zm9v","start_revision":1}}}" + // but the gprc-gateway expects a different format.. + wstr := `{"create_request" : ` + string(wreq) + "}" + p := cx.apiPrefix + + // concurrent watch + var ( + wg sync.WaitGroup + failedStreams int32 + ) + + wg.Add(int(maxStreams + exceededStreams)) + + for i := 0; i < int(maxStreams+exceededStreams); i++ { + go func() { + defer wg.Done() + // expects "bar", timeout after 2 seconds since stream waits forever + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/watch"), Value: wstr, Expected: `"YmFy"`, Timeout: 2}); err != nil { + // make sure the error is timeout + assert.Contains(cx.t, err.Error(), "curl: (28) Operation timed out") + atomic.AddInt32(&failedStreams, 1) + } + }() + // sleep for concurrent stream limit + time.Sleep(1 * time.Millisecond) + } + // wait for watch connection establishment + time.Sleep(20 * time.Millisecond) + // put KV + if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/put"), Value: string(putreq), Expected: "revision"}); err != nil { + cx.t.Fatalf("failed testV3CurlWatch put with curl using prefix (%s) (%v)", p, err) + } + // wait for watch timeout + wg.Wait() + + // because thread contention between streams trying to write to the connection, + // when the `MaxConcurrentStreams` is exceeded, the number of failed streams will + // not be exactly equal to the number of exceeded streams. + // so we need to make two different assertions depending on the situation. + // (concurrentWatches + basicStreams) is the actual number of concurrent streams required + if exceededStreams < 1 { + assert.Equal(cx.t, int32(0), failedStreams) + } else { + assert.GreaterOrEqual(cx.t, uint32(failedStreams), exceededStreams) + } +} + func testV3CurlTxn(cx ctlCtx) { txn := &pb.TxnRequest{ Compare: []*pb.Compare{ @@ -182,7 +268,6 @@ func testV3CurlTxn(cx ctlCtx) { if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/txn"), Value: malformed, Expected: "error"}); err != nil { cx.t.Fatalf("failed testV3CurlTxn put with curl using prefix (%s) (%v)", p, err) } - } func testV3CurlAuth(cx ctlCtx) { @@ -209,7 +294,7 @@ func testV3CurlAuth(cx ctlCtx) { cx.t.Fatalf("failed testV3CurlAuth create role with curl using prefix (%s) (%v)", p, err) } - //grant root role + // grant root role for i := 0; i < len(usernames); i++ { grantroleroot, err := json.Marshal(&pb.AuthUserGrantRoleRequest{User: usernames[i], Role: "root"}) testutil.AssertNil(cx.t, err) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4b1daf93d784..52e78661dfad 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -148,7 +148,8 @@ type EtcdProcessClusterConfig struct { MetricsURLScheme string - SnapshotCount int // default is 10000 + SnapshotCount int // default is 10000 + MaxConcurrentStreams uint32 // default is math.MaxUint32 ClientTLS ClientConnType ClientCertAuthEnabled bool @@ -341,19 +342,26 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--log-level", cfg.LogLevel) } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", + fmt.Sprintf("%d", cfg.MaxConcurrentStreams), + ) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ - lg: lg, - ExecPath: cfg.ExecPath, - Args: args, - EnvVars: cfg.EnvVars, - TlsArgs: cfg.TlsArgs(), - DataDirPath: dataDirPath, - KeepDataDir: cfg.KeepDataDir, - Name: name, - Purl: purl, - Acurl: curl, - Murl: murl, - InitialToken: cfg.InitialToken, + lg: lg, + ExecPath: cfg.ExecPath, + Args: args, + EnvVars: cfg.EnvVars, + TlsArgs: cfg.TlsArgs(), + DataDirPath: dataDirPath, + KeepDataDir: cfg.KeepDataDir, + Name: name, + Purl: purl, + Acurl: curl, + Murl: murl, + InitialToken: cfg.InitialToken, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, } } diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 26094eefc43c..08b85bea6667 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -218,6 +218,11 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc { murl = proxyListenURL(cfg, 4) args = append(args, "--metrics-addr", murl) } + if cfg.MaxConcurrentStreams > 0 { + args = append(args, "--max-concurrent-streams", + fmt.Sprintf("%d", cfg.MaxConcurrentStreams), + ) + } tlsArgs := []string{} for i := 0; i < len(cfg.TlsArgs); i++ { switch cfg.TlsArgs[i] { diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index d80710c4048c..2dc2afd9cc0b 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -77,6 +77,8 @@ type EtcdServerProcessConfig struct { InitialToken string InitialCluster string + + MaxConcurrentStreams uint32 // default is math.MaxUint32 } func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {