Skip to content

Commit

Permalink
support custom grpc.MaxConcurrentStreams
Browse files Browse the repository at this point in the history
There is no update on the original PR (see below) for more then 2
weeks. So Benjamin(@ahrtr) continues to work on the PR. The first
step is to rebase the PR, because there are lots of conflicts with
the main branch.

The change to go.mod and go.sum reverted, because they are not needed.
The e2e test cases are also reverted, because they are not correct.

```
etcd-io#14081
```

Signed-off-by: nic-chen <chenjunxu6@gmail.com>
Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
杨金珏 authored and ahrtr committed Jul 5, 2022
1 parent 07d6208 commit faaf6e0
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 3 deletions.
50 changes: 50 additions & 0 deletions pkg/flags/int.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"flag"
"fmt"
"strconv"
)

type uint32Value uint32

func NewUint32Value(s string) *uint32Value {
var u uint32Value
if s == "" || s == "0" {
return &u
}
if err := u.Set(s); err != nil {
panic(fmt.Sprintf("new uint32Value should never fail: %v", err))
}
return &u
}

func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}
func (i *uint32Value) Type() string {
return "uint32"
}
func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}
57 changes: 57 additions & 0 deletions pkg/flags/int_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package flags

import (
"reflect"
"testing"
)

func TestInvalidUint32(t *testing.T) {
tests := []string{
// string
"invalid",
// negative number
"-1",
// float number
"0.1",
"-0.2",
// larger than math.MaxUint32
"4294967296",
}
for i, in := range tests {
var u uint32Value
if err := u.Set(in); err == nil {
t.Errorf(`#%d: unexpected nil error for in=%q`, i, in)
}
}
}

func TestUint32Value(t *testing.T) {
tests := []struct {
s string
exp uint32
}{
{s: "0", exp: 0},
{s: "1", exp: 1},
{s: "", exp: 0},
}
for i := range tests {
ss := uint32(*NewUint32Value(tests[i].s))
if !reflect.DeepEqual(tests[i].exp, ss) {
t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss)
}
}
}
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32

WarningApplyDuration time.Duration
WarningUnaryRequestDuration time.Duration

Expand Down
9 changes: 8 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -59,6 +60,7 @@ const (
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultWarningUnaryRequestDuration = 300 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -205,6 +207,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -311,7 +317,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`

//The AuthTokenTTL in seconds of the simple token
// AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
Expand Down Expand Up @@ -462,6 +468,7 @@ func NewConfig() *Config {

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
Expand Down
14 changes: 14 additions & 0 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/tmc/grpc-websocket-proxy/wsproxy"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -148,6 +149,9 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

Expand Down Expand Up @@ -197,6 +201,9 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -209,6 +216,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}

// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server
func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error {
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
})
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func newConfig() *config {
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")

fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.")

// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),
Expand Down Expand Up @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error {

cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")

cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")

cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")

cfg.ec.ClusterState = cfg.cf.clusterState.String()
Expand Down
11 changes: 11 additions & 0 deletions server/etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -95,6 +96,8 @@ var (
grpcKeepAliveMinTime time.Duration
grpcKeepAliveTimeout time.Duration
grpcKeepAliveInterval time.Duration

maxConcurrentStreams uint32
)

const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024
Expand Down Expand Up @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command {

cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.")

cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client may have open at a time.")

return &cmd
}

Expand Down Expand Up @@ -212,6 +217,12 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
httpClient := mustNewHTTPClient(lg)

srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient)
if err := http2.ConfigureServer(srvhttp, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
}); err != nil {
panic(err)
}

errc := make(chan error, 3)
go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams '0'
Maximum concurrent streams that each client may have open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Expand Down
3 changes: 1 addition & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

grpcServer := grpc.NewServer(append(opts, gopts...)...)

Expand Down

0 comments on commit faaf6e0

Please sign in to comment.