Skip to content

Commit

Permalink
feat: support custom grpc.MaxConcurrentStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
杨金珏 authored and nic-chen committed Jun 5, 2022
1 parent 08f4c34 commit 0bd38a5
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 8 deletions.
36 changes: 36 additions & 0 deletions pkg/flags/int.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package flags

import (
"flag"
"fmt"
"strconv"
)

type uint32Value uint32

func NewUint32Value(s string) *uint32Value {
var u uint32Value
if s == "" || s == "0" {
return &u
}
if err := u.Set(s); err != nil {
panic(fmt.Sprintf("new uint32Value should never fail: %v", err))
}
return &u
}

func (i *uint32Value) Set(s string) error {
v, err := strconv.ParseUint(s, 0, 32)
*i = uint32Value(v)
return err
}
func (i *uint32Value) Type() string {
return "uint32"
}
func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) }

// Uint32FromFlag return the uint32 value of a flag with the given name
func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 {
val := *fs.Lookup(name).Value.(*uint32Value)
return uint32(val)
}
43 changes: 43 additions & 0 deletions pkg/flags/int_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package flags

import (
"reflect"
"testing"
)

func TestInvalidUint32(t *testing.T) {
tests := []string{
// string
"invalid",
// negative number
"-1",
// float number
"0.1",
"-0.2",
// larger than math.MaxUint32
"4294967296",
}
for i, in := range tests {
var u uint32Value
if err := u.Set(in); err == nil {
t.Errorf(`#%d: unexpected nil error for in=%q`, i, in)
}
}
}

func TestUint32Value(t *testing.T) {
tests := []struct {
s string
exp uint32
}{
{s: "0", exp: 0},
{s: "1", exp: 1},
{s: "", exp: 0},
}
for i := range tests {
ss := uint32(*NewUint32Value(tests[i].s))
if !reflect.DeepEqual(tests[i].exp, ss) {
t.Fatalf("#%d: expected %q, got %q", i, tests[i].exp, ss)
}
}
}
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32

WarningApplyDuration time.Duration
WarningUnaryRequestDuration time.Duration

Expand Down
9 changes: 8 additions & 1 deletion server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package embed
import (
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
Expand Down Expand Up @@ -59,6 +60,7 @@ const (
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultWarningUnaryRequestDuration = 300 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultMaxConcurrentStreams = math.MaxUint32
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
Expand Down Expand Up @@ -205,6 +207,10 @@ type Config struct {
MaxTxnOps uint `json:"max-txn-ops"`
MaxRequestBytes uint `json:"max-request-bytes"`

// MaxConcurrentStreams optionally specifies the number of concurrent
// streams that each client may have open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

LPUrls, LCUrls []url.URL
APUrls, ACUrls []url.URL
ClientTLSInfo transport.TLSInfo
Expand Down Expand Up @@ -311,7 +317,7 @@ type Config struct {
AuthToken string `json:"auth-token"`
BcryptCost uint `json:"bcrypt-cost"`

//The AuthTokenTTL in seconds of the simple token
// The AuthTokenTTL in seconds of the simple token
AuthTokenTTL uint `json:"auth-token-ttl"`

ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"`
Expand Down Expand Up @@ -462,6 +468,7 @@ func NewConfig() *Config {

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxConcurrentStreams: DefaultMaxConcurrentStreams,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
BackendBatchInterval: cfg.BackendBatchInterval,
MaxTxnOps: cfg.MaxTxnOps,
MaxRequestBytes: cfg.MaxRequestBytes,
MaxConcurrentStreams: cfg.MaxConcurrentStreams,
SocketOpts: cfg.SocketOpts,
StrictReconfigCheck: cfg.StrictReconfigCheck,
ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth,
Expand Down
14 changes: 14 additions & 0 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/soheilhy/cmux"
"github.com/tmc/grpc-websocket-proxy/wsproxy"
"go.uber.org/zap"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -142,6 +143,9 @@ func (sctx *serveCtx) serve(
Handler: createAccessController(sctx.lg, s, httpmux),
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srvhttp, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
httpl := m.Match(cmux.HTTP1())
go func() { errHandler(srvhttp.Serve(httpl)) }()

Expand Down Expand Up @@ -191,6 +195,9 @@ func (sctx *serveCtx) serve(
TLSConfig: tlscfg,
ErrorLog: logger, // do not log user error
}
if err := setMaxConcurrentStreams(srv, s.Cfg.MaxConcurrentStreams); err != nil {
return err
}
go func() { errHandler(srv.Serve(tlsl)) }()

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -204,6 +211,13 @@ func (sctx *serveCtx) serve(
return m.Serve()
}

// setMaxConcurrentStreams sets the maxConcurrentStreams for the http server
func setMaxConcurrentStreams(srv *http.Server, maxConcurrentStreams uint32) error {
return http2.ConfigureServer(srv, &http2.Server{
MaxConcurrentStreams: maxConcurrentStreams,
})
}

// grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC
// connections or otherHandler otherwise. Given in gRPC docs.
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
Expand Down
4 changes: 4 additions & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func newConfig() *config {
fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection")
fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection")

fs.Var(flags.NewUint32Value(""), "max-concurrent-streams", "Maximum concurrent streams that each client may have open at a time.")

// clustering
fs.Var(
flags.NewUniqueURLsWithExceptions(embed.DefaultInitialAdvertisePeerURLs, ""),
Expand Down Expand Up @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error {

cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites")

cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams")

cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs")

cfg.ec.ClusterState = cfg.cf.clusterState.String()
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Member:
Maximum number of operations permitted in a transaction.
--max-request-bytes '1572864'
Maximum client request size in bytes the server will accept.
--max-concurrent-streams '0'
Maximum concurrent streams that each client may have open at a time.
--grpc-keepalive-min-time '5s'
Minimum duration interval that a client should wait before pinging server.
--grpc-keepalive-interval '2h'
Expand Down
3 changes: 1 addition & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

const (
grpcOverheadBytes = 512 * 1024
maxStreams = math.MaxUint32
maxSendBytes = math.MaxInt32
)

Expand Down Expand Up @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer

opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams))

grpcServer := grpc.NewServer(append(opts, gopts...)...)

Expand Down
71 changes: 67 additions & 4 deletions tests/e2e/v3_curl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@ import (
"math/rand"
"path"
"strconv"
"sync"
"testing"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/stretchr/testify/assert"

"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
epb "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
"go.etcd.io/etcd/tests/v3/framework/e2e"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
)

var apiPrefix = []string{"/v3"}
var (
apiPrefix = []string{"/v3"}
maxConcurrent uint32 = 3
)

func TestV3CurlPutGetNoTLS(t *testing.T) {
for _, p := range apiPrefix {
Expand All @@ -60,6 +65,20 @@ func TestV3CurlPutGetClientTLS(t *testing.T) {
testCtl(t, testV3CurlPutGet, withApiPrefix(p), withCfg(*e2e.NewConfigClientTLS()))
}
}
func TestV3CurlWatchConcurrent(t *testing.T) {
cfg := *e2e.NewConfigNoTLS()
cfg.MaxConcurrentStreams = maxConcurrent
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg))
}
}
func TestV3CurlWatchConcurrentClientTLS(t *testing.T) {
cfg := *e2e.NewConfigClientTLS()
cfg.MaxConcurrentStreams = maxConcurrent
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg))
}
}
func TestV3CurlWatch(t *testing.T) {
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatch, withApiPrefix(p))
Expand Down Expand Up @@ -145,6 +164,50 @@ func testV3CurlWatch(cx ctlCtx) {
}
}

func testV3CurlWatchConcurrent(cx ctlCtx) {
// store "bar" into "foo"
putreq, err := json.Marshal(&pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
cx.t.Fatal(err)
}
// watch for first update to "foo"
wcr := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}
wreq, err := json.Marshal(wcr)
if err != nil {
cx.t.Fatal(err)
}
// marshaling the grpc to json gives:
// "{"RequestUnion":{"CreateRequest":{"key":"Zm9v","start_revision":1}}}"
// but the gprc-gateway expects a different format..
wstr := `{"create_request" : ` + string(wreq) + "}"
p := cx.apiPrefix

// concurrent watch
var wg sync.WaitGroup
excessConcurrent := 2
wg.Add(int(maxConcurrent) + excessConcurrent)
timeoutConcurrent := 0
for i := 0; i < int(maxConcurrent)+excessConcurrent; i++ {
go func() {
defer wg.Done()
// expects "bar", timeout after 2 seconds since stream waits forever
if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/watch"), Value: wstr, Expected: `"YmFy"`, Timeout: 2}); err != nil {
// make sure the error is timeout
assert.Contains(cx.t, err.Error(), "curl: (28) Operation timed out")
timeoutConcurrent += 1
}
}()
}
// update
if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/put"), Value: string(putreq), Expected: "revision"}); err != nil {
cx.t.Fatalf("failed testV3CurlWatch put with curl using prefix (%s) (%v)", p, err)
}
// wait for watch
wg.Wait()
// assert that timeoutConcurrent should equal excessConcurrent
assert.Equal(cx.t, excessConcurrent, timeoutConcurrent)
}

func testV3CurlTxn(cx ctlCtx) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
Expand Down Expand Up @@ -209,7 +272,7 @@ func testV3CurlAuth(cx ctlCtx) {
cx.t.Fatalf("failed testV3CurlAuth create role with curl using prefix (%s) (%v)", p, err)
}

//grant root role
// grant root role
for i := 0; i < len(usernames); i++ {
grantroleroot, err := json.Marshal(&pb.AuthUserGrantRoleRequest{User: usernames[i], Role: "root"})
testutil.AssertNil(cx.t, err)
Expand Down
9 changes: 8 additions & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ type EtcdProcessClusterConfig struct {

MetricsURLScheme string

SnapshotCount int // default is 10000
SnapshotCount int // default is 10000
MaxConcurrentStreams uint32 // default is math.MaxUint32

ClientTLS ClientConnType
ClientCertAuthEnabled bool
Expand Down Expand Up @@ -341,6 +342,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--log-level", cfg.LogLevel)
}

if cfg.MaxConcurrentStreams != 0 {
args = append(args, "--max-concurrent-streams",
fmt.Sprintf("%d", cfg.MaxConcurrentStreams),
)
}

etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
ExecPath: cfg.ExecPath,
Expand Down

0 comments on commit 0bd38a5

Please sign in to comment.