Skip to content

Commit

Permalink
Implement new Codec that uses mem.BufferSlice instead of []byte (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
PapaCharlie authored Aug 21, 2024
1 parent 7e12068 commit 9ab8b62
Show file tree
Hide file tree
Showing 39 changed files with 1,842 additions and 1,065 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ jobs:
goversion: '1.22'
testflags: -race

- type: tests
goversion: '1.22'
testflags: '-race -tags=buffer_pooling'

- type: tests
goversion: '1.22'
goarch: 386
Expand Down
34 changes: 30 additions & 4 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ import (
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/experimental"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/test/bufconn"

Expand Down Expand Up @@ -153,6 +153,33 @@ const (
warmuptime = time.Second
)

var useNopBufferPool atomic.Bool

type swappableBufferPool struct {
mem.BufferPool
}

func (p swappableBufferPool) Get(length int) *[]byte {
var pool mem.BufferPool
if useNopBufferPool.Load() {
pool = mem.NopBufferPool{}
} else {
pool = p.BufferPool
}
return pool.Get(length)
}

func (p swappableBufferPool) Put(i *[]byte) {
if useNopBufferPool.Load() {
return
}
p.BufferPool.Put(i)
}

func init() {
internal.SetDefaultBufferPoolForTesting.(func(mem.BufferPool))(swappableBufferPool{mem.DefaultBufferPool()})
}

var (
allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
Expand Down Expand Up @@ -343,10 +370,9 @@ func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func())
}
switch bf.RecvBufferPool {
case recvBufferPoolNil:
// Do nothing.
useNopBufferPool.Store(true)
case recvBufferPoolSimple:
opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
sopts = append(sopts, experimental.RecvBufferPool(grpc.NewSharedBufferPool()))
// Do nothing as buffering is enabled by default.
default:
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
}
Expand Down
75 changes: 68 additions & 7 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,79 @@ package grpc
import (
"google.golang.org/grpc/encoding"
_ "google.golang.org/grpc/encoding/proto" // to register the Codec for "proto"
"google.golang.org/grpc/mem"
)

// baseCodec contains the functionality of both Codec and encoding.Codec, but
// omits the name/string, which vary between the two and are not needed for
// anything besides the registry in the encoding package.
// baseCodec captures the new encoding.CodecV2 interface without the Name
// function, allowing it to be implemented by older Codec and encoding.Codec
// implementations. The omitted Name function is only needed for the register in
// the encoding package and is not part of the core functionality.
type baseCodec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
Marshal(v any) (mem.BufferSlice, error)
Unmarshal(data mem.BufferSlice, v any) error
}

// getCodec returns an encoding.CodecV2 for the codec of the given name (if
// registered). Initially checks the V2 registry with encoding.GetCodecV2 and
// returns the V2 codec if it is registered. Otherwise, it checks the V1 registry
// with encoding.GetCodec and if it is registered wraps it with newCodecV1Bridge
// to turn it into an encoding.CodecV2. Returns nil otherwise.
func getCodec(name string) encoding.CodecV2 {
codecV2 := encoding.GetCodecV2(name)
if codecV2 != nil {
return codecV2
}

codecV1 := encoding.GetCodec(name)
if codecV1 != nil {
return newCodecV1Bridge(codecV1)
}

return nil
}

var _ baseCodec = Codec(nil)
var _ baseCodec = encoding.Codec(nil)
func newCodecV0Bridge(c Codec) baseCodec {
return codecV0Bridge{codec: c}
}

func newCodecV1Bridge(c encoding.Codec) encoding.CodecV2 {
return codecV1Bridge{
codecV0Bridge: codecV0Bridge{codec: c},
name: c.Name(),
}
}

var _ baseCodec = codecV0Bridge{}

type codecV0Bridge struct {
codec interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}
}

func (c codecV0Bridge) Marshal(v any) (mem.BufferSlice, error) {
data, err := c.codec.Marshal(v)
if err != nil {
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(&data, nil)}, nil
}

func (c codecV0Bridge) Unmarshal(data mem.BufferSlice, v any) (err error) {
return c.codec.Unmarshal(data.Materialize(), v)
}

var _ encoding.CodecV2 = codecV1Bridge{}

type codecV1Bridge struct {
codecV0Bridge
name string
}

func (c codecV1Bridge) Name() string {
return c.name
}

// Codec defines the interface gRPC uses to encode and decode messages.
// Note that implementations of this interface must be thread safe;
Expand Down
27 changes: 5 additions & 22 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func init() {
internal.WithBinaryLogger = withBinaryLogger
internal.JoinDialOptions = newJoinDialOption
internal.DisableGlobalDialOptions = newDisableGlobalDialOptions
internal.WithRecvBufferPool = withRecvBufferPool
internal.WithBufferPool = withBufferPool
}

// dialOptions configure a Dial call. dialOptions are set by the DialOption
Expand Down Expand Up @@ -92,7 +93,6 @@ type dialOptions struct {
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
idleTimeout time.Duration
recvBufferPool SharedBufferPool
defaultScheme string
maxCallAttempts int
}
Expand Down Expand Up @@ -679,11 +679,11 @@ func defaultDialOptions() dialOptions {
WriteBufferSize: defaultWriteBufSize,
UseProxy: true,
UserAgent: grpcUA,
BufferPool: mem.DefaultBufferPool(),
},
bs: internalbackoff.DefaultExponential,
healthCheckFunc: internal.HealthCheckFunc,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
defaultScheme: "dns",
maxCallAttempts: defaultMaxCallAttempts,
}
Expand Down Expand Up @@ -760,25 +760,8 @@ func WithMaxCallAttempts(n int) DialOption {
})
}

// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
//
// If you are unsure about how to implement a memory pool but want to utilize one,
// begin with grpc.NewSharedBufferPool.
//
// Note: The shared buffer pool feature will not be active if any of the following
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.
//
// Deprecated: use experimental.WithRecvBufferPool instead. Will be deleted in
// v1.60.0 or later.
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
return withRecvBufferPool(bufferPool)
}

func withRecvBufferPool(bufferPool SharedBufferPool) DialOption {
func withBufferPool(bufferPool mem.BufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.recvBufferPool = bufferPool
o.copts.BufferPool = bufferPool
})
}
82 changes: 82 additions & 0 deletions encoding/encoding_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package encoding

import (
"strings"

"google.golang.org/grpc/mem"
)

// CodecV2 defines the interface gRPC uses to encode and decode messages. Note
// that implementations of this interface must be thread safe; a CodecV2's
// methods can be called from concurrent goroutines.
type CodecV2 interface {
// Marshal returns the wire format of v. The buffers in the returned
// [mem.BufferSlice] must have at least one reference each, which will be freed
// by gRPC when they are no longer needed.
Marshal(v any) (out mem.BufferSlice, err error)
// Unmarshal parses the wire format into v. Note that data will be freed as soon
// as this function returns. If the codec wishes to guarantee access to the data
// after this function, it must take its own reference that it frees when it is
// no longer needed.
Unmarshal(data mem.BufferSlice, v any) error
// Name returns the name of the Codec implementation. The returned string
// will be used as part of content type in transmission. The result must be
// static; the result cannot change between calls.
Name() string
}

var registeredV2Codecs = make(map[string]CodecV2)

// RegisterCodecV2 registers the provided CodecV2 for use with all gRPC clients and
// servers.
//
// The CodecV2 will be stored and looked up by result of its Name() method, which
// should match the content-subtype of the encoding handled by the CodecV2. This
// is case-insensitive, and is stored and looked up as lowercase. If the
// result of calling Name() is an empty string, RegisterCodecV2 will panic. See
// Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details.
//
// If both a Codec and CodecV2 are registered with the same name, the CodecV2
// will be used.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Codecs are
// registered with the same name, the one registered last will take effect.
func RegisterCodecV2(codec CodecV2) {
if codec == nil {
panic("cannot register a nil CodecV2")
}
if codec.Name() == "" {
panic("cannot register CodecV2 with empty string result for Name()")
}
contentSubtype := strings.ToLower(codec.Name())
registeredV2Codecs[contentSubtype] = codec
}

// GetCodecV2 gets a registered CodecV2 by content-subtype, or nil if no CodecV2 is
// registered for the content-subtype.
//
// The content-subtype is expected to be lowercase.
func GetCodecV2(contentSubtype string) CodecV2 {
return registeredV2Codecs[contentSubtype]
}
81 changes: 81 additions & 0 deletions encoding/proto/proto_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package proto

import (
"fmt"

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
)

func init() {
encoding.RegisterCodecV2(&codecV2{})
}

// codec is a CodecV2 implementation with protobuf. It is the default codec for
// gRPC.
type codecV2 struct{}

var _ encoding.CodecV2 = (*codecV2)(nil)

func (c *codecV2) Marshal(v any) (data mem.BufferSlice, err error) {
vv := messageV2Of(v)
if vv == nil {
return nil, fmt.Errorf("proto: failed to marshal, message is %T, want proto.Message", v)
}

size := proto.Size(vv)
if mem.IsBelowBufferPoolingThreshold(size) {
buf, err := proto.Marshal(vv)
if err != nil {
return nil, err
}
data = append(data, mem.SliceBuffer(buf))
} else {
pool := mem.DefaultBufferPool()
buf := pool.Get(size)
if _, err := (proto.MarshalOptions{}).MarshalAppend((*buf)[:0], vv); err != nil {
pool.Put(buf)
return nil, err
}
data = append(data, mem.NewBuffer(buf, pool))
}

return data, nil
}

func (c *codecV2) Unmarshal(data mem.BufferSlice, v any) (err error) {
vv := messageV2Of(v)
if vv == nil {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}

buf := data.MaterializeToBuffer(mem.DefaultBufferPool())
defer buf.Free()
// TODO: Upgrade proto.Unmarshal to support mem.BufferSlice. Right now, it's not
// really possible without a major overhaul of the proto package, but the
// vtprotobuf library may be able to support this.
return proto.Unmarshal(buf.ReadOnlyData(), vv)
}

func (c *codecV2) Name() string {
return Name
}
Loading

0 comments on commit 9ab8b62

Please sign in to comment.