Skip to content

Commit

Permalink
fix: fix the framer reading and writing without nhttp2
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangxuewu committed Apr 15, 2022
1 parent 9cbc511 commit 84fbb14
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 37 deletions.
28 changes: 28 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,34 @@ func WithGRPCConnPoolSize(s uint32) Option {
}}
}

// WithGRPCWriteBufferSize determines how much data can be batched before doing a
// write on the wire. The corresponding memory allocation for this buffer will
// be twice the size to keep syscalls low. The default value for this buffer is
// 32KB.
//
// Zero will disable the write buffer such that each write will be on underlying
// connection. Note: A Send call may not directly translate to a write.
// It corresponds to the WithWriteBufferSize DialOption of gRPC.
func WithGRPCWriteBufferSize(s uint32) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithGRPCWriteBufferSize(%d)", s))
o.GRPCConnectOpts.WriteBufferSize = s
}}
}

// WithGRPCReadBufferSize lets you set the size of read buffer, this determines how
// much data can be read at most for each read syscall.
//
// The default value for this buffer is 32KB. Zero will disable read buffer for
// a connection so data framer can access the underlying conn directly.
// It corresponds to the WithReadBufferSize DialOption of gRPC.
func WithGRPCReadBufferSize(s uint32) Option {
return Option{F: func(o *client.Options, di *utils.Slice) {
di.Push(fmt.Sprintf("WithGRPCReadBufferSize(%d)", s))
o.GRPCConnectOpts.ReadBufferSize = s
}}
}

// WithGRPCInitialWindowSize sets the value for initial window size on a grpc stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
// It corresponds to the WithInitialWindowSize DialOption of gRPC.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/choleraehyq/pid v0.0.13
github.com/cloudwego/netpoll v0.2.0
github.com/cloudwego/netpoll-http2 v0.0.6
github.com/cloudwego/thriftgo v0.1.2
github.com/json-iterator/go v1.1.11
github.com/tidwall/gjson v1.9.3
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d
golang.org/x/tools v0.1.0
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,11 @@ github.com/bytedance/gopkg v0.0.0-20210910103821-e4efae9c17c3/go.mod h1:birsdqRC
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/choleraehyq/pid v0.0.12 h1:JLiTCsz2gStQZ3YWet+p9hktRnWzk7VJigpzvGV+I2o=
github.com/choleraehyq/pid v0.0.12/go.mod h1:uhzeFgxJZWQsZulelVQZwdASxQ9TIPZYL4TPkQMtL/U=
github.com/choleraehyq/pid v0.0.13 h1:Tc/jYjHC50SDCxSX+DWHfMmFqtwGR8EiQ08qJ/EK8zs=
github.com/choleraehyq/pid v0.0.13/go.mod h1:uhzeFgxJZWQsZulelVQZwdASxQ9TIPZYL4TPkQMtL/U=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudwego/netpoll v0.1.0/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
github.com/cloudwego/netpoll v0.2.0 h1:MmZX/jS6ozso86mnbVJ7fUO1hL4LOH/XngXN7Pn347A=
github.com/cloudwego/netpoll v0.2.0/go.mod h1:rZOiNI0FYjuvNybXKKhAPUja03loJi/cdv2F55AE6E8=
github.com/cloudwego/netpoll-http2 v0.0.6 h1:+jdkMKGj7ifRqWOdyT/hqzhXklmqh/H4lyOdrAVkI/U=
github.com/cloudwego/netpoll-http2 v0.0.6/go.mod h1:+bjPyu2Cd4GDzKa0IegPgp1hjMjpZ6/kXTsSjIsmUk8=
github.com/cloudwego/thriftgo v0.1.2 h1:AXpGJiWE3VggfiRHwA6raRJUIcjxliEIfJfGlvRiYUA=
github.com/cloudwego/thriftgo v0.1.2/go.mod h1:LzeafuLSiHA9JTiWC8TIMIq64iadeObgRUhmVG1OC/w=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -79,7 +74,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand All @@ -94,7 +88,6 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand All @@ -115,7 +108,6 @@ golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210415045647-66c3f260301c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d h1:FjkYO/PPp4Wi0EAUOVLxePm7qVW4r4ctbWpURyuOD0E=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
2 changes: 1 addition & 1 deletion internal/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func newServerOption() *remote.ServerOption {
ExitWaitTime: defaultExitWaitTime,
MaxConnectionIdleTime: defaultConnectionIdleTime,
AcceptFailedDelayTime: defaultAcceptFailedDelayTime,
GRPCCfg: new(grpc.ServerConfig),
GRPCCfg: grpc.DefaultServerConfig(),
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/remote/trans/nphttp2/grpc/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"sync/atomic"

"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/netpoll-http2"
"github.com/cloudwego/netpoll-http2/hpack"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
Expand Down Expand Up @@ -547,7 +547,7 @@ func (l *loopyWriter) run(remoteAddr string) (err error) {
}
if gosched {
gosched = false
if l.framer.writer.MallocLen() < minBatchSize {
if l.framer.writer.offset < minBatchSize {
runtime.Gosched()
continue hasdata
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/remote/trans/nphttp2/grpc/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
defaultWriteQuota = 64 * 1024
defaultClientMaxHeaderListSize = uint32(16 << 20)
defaultServerMaxHeaderListSize = uint32(16 << 20)
// http2IOBufSize specifies the buffer size for sending frames.
defaultWriteBufferSize = uint32(32 * 1024)
// http2IOBufSize specifies the buffer size for receiving frames.
defaultReadBufferSize = uint32(32 * 1024)

defaultUserAgent = "kitex/" + kitex.Version
)
Expand Down
16 changes: 9 additions & 7 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ import (
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc/syscall"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
"github.com/cloudwego/netpoll"
"github.com/cloudwego/netpoll-http2"
"github.com/cloudwego/netpoll-http2/hpack"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)

// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
cancel context.CancelFunc
conn netpoll.Connection // underlying communication channel
conn net.Conn // underlying communication channel
loopy *loopyWriter
remoteAddr net.Addr
localAddr net.Addr
Expand Down Expand Up @@ -105,7 +105,7 @@ type http2Client struct {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(ctx context.Context, conn netpoll.Connection, opts ConnectOptions,
func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
remoteService string, onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
ctx, cancel := context.WithCancel(ctx)
defer func() {
Expand All @@ -124,7 +124,7 @@ func newHTTP2Client(ctx context.Context, conn netpoll.Connection, opts ConnectOp
}
keepaliveEnabled := false
if kp.Time != Infinity {
if err = conn.SetIdleTimeout(kp.Timeout); err != nil {
if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
}
keepaliveEnabled = true
Expand All @@ -137,6 +137,8 @@ func newHTTP2Client(ctx context.Context, conn netpoll.Connection, opts ConnectOp
dynamicWindow = false
}

writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
maxHeaderListSize := defaultClientMaxHeaderListSize
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
Expand All @@ -150,7 +152,7 @@ func newHTTP2Client(ctx context.Context, conn netpoll.Connection, opts ConnectOp
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
framer: newFramer(conn, maxHeaderListSize),
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
fc: &trInFlow{limit: icwz},
activeStreams: make(map[uint32]*Stream),
kp: kp,
Expand Down
22 changes: 15 additions & 7 deletions pkg/remote/trans/nphttp2/grpc/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
"github.com/cloudwego/netpoll"
http2 "github.com/cloudwego/netpoll-http2"
"github.com/cloudwego/netpoll-http2/hpack"
http2 "golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/protobuf/proto"
)

Expand All @@ -62,7 +62,7 @@ type http2Server struct {
lastRead int64
ctx context.Context
done chan struct{}
conn netpoll.Connection
conn net.Conn
loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing.
writerDone chan struct{} // sync point to enable testing.
Expand Down Expand Up @@ -113,13 +113,13 @@ type http2Server struct {

// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(ctx context.Context, conn netpoll.Connection, config *ServerConfig) (_ ServerTransport, err error) {
func newHTTP2Server(ctx context.Context, conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
maxHeaderListSize := defaultServerMaxHeaderListSize
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}

framer := newFramer(conn, maxHeaderListSize)
framer := newFramer(conn, config.WriteBufferSize, config.ReadBufferSize, maxHeaderListSize)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
Expand Down Expand Up @@ -230,8 +230,16 @@ func newHTTP2Server(ctx context.Context, conn netpoll.Connection, config *Server
}()

// Check the validity of client preface.
preface, err := t.conn.Reader().Next(ClientPrefaceLen)
if err != nil {
preface := make([]byte, len(ClientPreface))
if _, err := io.ReadFull(t.conn, preface); err != nil {
// In deployments where a gRPC server runs behind a cloud load balancer
// which performs regular TCP level health checks, the connection is
// closed immediately by the latter. Returning io.EOF here allows the
// grpc server implementation to recognize this scenario and suppress
// logging to reduce spam.
if err == io.EOF {
return nil, io.EOF
}
return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
}
if !bytes.Equal(preface, ClientPreface) {
Expand Down
75 changes: 68 additions & 7 deletions pkg/remote/trans/nphttp2/grpc/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package grpc

import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
"math"
"net"
"net/http"
"strconv"
"strings"
Expand All @@ -34,9 +37,8 @@ import (
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/status"
"github.com/cloudwego/netpoll"
"github.com/cloudwego/netpoll-http2"
"github.com/cloudwego/netpoll-http2/hpack"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -603,13 +605,20 @@ func decodeGrpcMessageUnchecked(msg string) string {

type framer struct {
*http2.Framer
writer netpoll.Writer
writer *bufWriter
}

func newFramer(conn netpoll.Connection, maxHeaderListSize uint32) *framer {
func newFramer(conn net.Conn, writeBufferSize, readBufferSize, maxHeaderListSize uint32) *framer {
w := newBufWriter(conn, int(writeBufferSize))

var r io.Reader = conn
if readBufferSize > 0 {
r = bufio.NewReaderSize(r, int(readBufferSize))
}

fr := &framer{
writer: conn.Writer(),
Framer: http2.NewFramer(conn.Writer(), conn.Reader()),
writer: w,
Framer: http2.NewFramer(w, r),
}
fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
Expand All @@ -619,3 +628,55 @@ func newFramer(conn netpoll.Connection, maxHeaderListSize uint32) *framer {
fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return fr
}

type bufWriter struct {
buf []byte
offset int
batchSize int
conn net.Conn
err error

onFlush func()
}

func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
return &bufWriter{
buf: make([]byte, batchSize*2),
batchSize: batchSize,
conn: conn,
}
}

func (w *bufWriter) Write(b []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
return w.conn.Write(b)
}
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
err = w.Flush()
}
}
return n, err
}

func (w *bufWriter) Flush() error {
if w.err != nil {
return w.err
}
if w.offset == 0 {
return nil
}
if w.onFlush != nil {
w.onFlush()
}
_, w.err = w.conn.Write(w.buf[:w.offset])
w.offset = 0
return w.err
}
Loading

0 comments on commit 84fbb14

Please sign in to comment.