Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a mechanism for encoded message buffer recycling #6619

Open
HippoBaro opened this issue Sep 11, 2023 · 28 comments
Open

Provide a mechanism for encoded message buffer recycling #6619

HippoBaro opened this issue Sep 11, 2023 · 28 comments
Labels
Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. P2 Type: Feature New features or improvements in behavior

Comments

@HippoBaro
Copy link

HippoBaro commented Sep 11, 2023

Use case(s) - what problem will this feature solve?

On their way to the wire, messages are encoded through user-provided encoding.Codec modules. These modules implement the following interface:

// Codec defines the interface gRPC uses to encode and decode messages.  Note
// that implementations of this interface must be thread safe; a Codec's
// methods can be called from concurrent goroutines.
type Codec interface {
	// Marshal returns the wire format of v.
	Marshal(v any) ([]byte, error)
        [...]
}

The implementation of this interface is responsible for allocating the resulting byte slice. Once gRPC is done writing the message to the underlying transport, the reference to the slice is dropped, and garbage collection has to scavenge it. This means memory used for message encoding is never directly reused but has to go through a GC cycle before it can be allocated again.

Proposed Solution

We propose adding an additional codec interface that complements the existing one:

// BufferedCodec is an optional interface Codec may implement.
// It signals the ability of the codec to use pre-existing memory
// when writing the wire format of messages.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
type BufferedCodec interface {
   // MarshalWithBuffer returns the wire format of v.
   //
   // Implementation may use a buffer from the provided buffer
   // pool when marshalling. Doing so enables memory reuse.
   MarshalWithBuffer(v any, pool SharedBufferPool) ([]byte, error)
}

This API takes the existing type grpc.SharedBufferPool that provides an allocator-like API. Codecs that can marshal into pre-existing buffers, such as protobuf, may implement it.

This interface should be optional because codecs need to estimate an upper bound for the size of the marshaled message to allocate a sufficiently large buffer. However, this estimate need not be accurate, as Go will reallocate buffers transparently if needed, and the GC will safely collect the original slice.

This new interface allows the allocation side to reuse buffers, but gRPC needs to return them to be recycled. We propose that gRPC return these encoded message buffers to the buffer pool once messages have been fully written to the transport and are no longer read from.

This implies adding new APIs for clients and servers, such as:

// ClientEncoderBufferPool is a CallOption to provide a SharedBufferPool
// used for the purpose of encoding messages. Buffers from this pool are
// used when encoding messages and returned once they have been transmitted
// over the network to be reused.
//
// Note that a compatible encoding.Codec is needed for buffer reuse. See
// encoding.BufferedCodec for additional details. If a non-compatible codec
// is used, buffer reuse will not apply.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ClientEncoderBufferPool(bufferPool SharedBufferPool) CallOption {
	return EncoderBufferPoolCallOption{BufferPool: bufferPool}
}

and

// ServerEncoderBufferPool is a ServerOption to provide a SharedBufferPool
// used for the purpose of encoding messages. Buffers from this pool are
// used when encoding messages and returned once they have been transmitted
// over the network to be reused.
//
// Note that a compatible encoding.Codec is needed for buffer reuse. See
// encoding.BufferedCodec for additional details. If a non-compatible codec
// is used, buffer reuse will not apply.
//
// # Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
func ServerEncoderBufferPool(bufferPool SharedBufferPool) ServerOption {
	return newFuncServerOption(func(o *serverOptions) {
		o.encoderBufferPool = bufferPool
	})
}

Giving users the ability to provide their implementation of buffer pools is advantageous for those that deal with specific messages of known sizes, and their implementation can be optimal. It also leverages prior art (see grpc.WithRecvBufferPool, for example)

Alternatives Considered

Changes to the codec could be omitted entirely: simply returning buffers to a user-provided pool and letting them coordinate with their codec to pull from it. However, this would make it difficult for the default proto codec to support this feature.

Alternatively, this change could be made without any external API changes by internally creating and managing the buffer pool. This would prevent third-party codecs from benefitting from the improvement, however.

Additional Context

We are currently running a service that uses gRPC streaming to stream (potentially) large files, in chunks, back to gRPC clients over the network. We measured that the Go allocation volume per second is roughly equal to the network throughput of the host. This creates GC cycles that introduce latency spikes and prevent us from predictably saturating the network at a reasonable CPU cost. After investigation, we have isolated the source of most of these allocations to protobuf slice creation during message serialization.

This service demonstrated the benefit of this approach with a very significant reduction in allocation volume (>90% fewer bytes allocated per second and a 7x reduction in allocations) and much lower CPU usage (>30% less). We offer an implementation of this proposal for context and as a conversation starter: #6613

I should note that there was prior work on this issue here #2817 and here #2816 but these issues didn't attract much discussion.

Thank you!

@HippoBaro HippoBaro added the Type: Feature New features or improvements in behavior label Sep 11, 2023
@easwars
Copy link
Contributor

easwars commented Sep 12, 2023

Thanks for the detailed issue. We will get back to you shortly.

@atollena
Copy link
Collaborator

atollena commented Oct 5, 2023

Gentle ping on this. We discussed this issue and the associated PR with easwars, dfawley, zasweq, joybestourous and s-matyukevich during our trip to the google office.

@dfawley expressed concerns about where buffers are released:

  1. Stats handlers may keep a copy (and in fact I think you know existing usages that do, such as handlers that keep the n last RPCs for debugging and make them available via a mechanism similar to channelz). This makes the PR a breaking change.
  2. I think you mentioned the tap mechanism, but from what I see only the method name is exposed there in tap.go, so perhaps you were thinking of something else.

Anything you'd like to add?

@easwars
Copy link
Contributor

easwars commented Oct 5, 2023

Apologies for the delay. I've been swamped with other things and I haven't been able to get to this yet.

@easwars
Copy link
Contributor

easwars commented Oct 13, 2023

Unfortunately I could not get to this in time and I'm going to be away from work for a while. Someone else from the team will pick this up. Thanks for your patience.

@easwars easwars removed their assignment Oct 13, 2023
@arvindbr8 arvindbr8 self-assigned this Oct 17, 2023
@HippoBaro
Copy link
Author

Stats handlers may keep a copy (and in fact I think you know existing usages that do, such as handlers that keep the n last RPCs for debugging and make them available via a mechanism similar to channelz). This makes the PR a breaking change.

Do you mean, "Stats handlers may keep a reference"?

This is what I was worried about (I mentioned this in #6613). My initial thought was that doing so should not be allowed (if messages need to be preserved beyond the scope of an interceptor, they should be memcpy'd). There's no way to enforce that, though, and we'll likely make the ecosystem more brittle if we change the rule there.

There's precedent for merging these kinds of PRs and automatically turning off the feature if any interceptors are added (see #5862). Could we take a similar approach here?

grpc-go/dialoptions.go

Lines 705 to 707 in e14d583

// Note: The shared buffer pool feature will not be active if any of the following
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.

@ash2k
Copy link
Contributor

ash2k commented Oct 18, 2023

Some discussion on the same issue of stat handlers is in #6660.

@arvindbr8
Copy link
Member

We have a take a step back and look at the best way to implement these buffer recycling enhancements. However, our team needs more cycles to investigate and review the changes proposed. We will keep posting updates as our investigation progresses.

@arvindbr8 arvindbr8 removed their assignment Oct 19, 2023
@HippoBaro
Copy link
Author

@arvindbr8 Do you have any update to share? Is there a way for me to help?

@arvindbr8
Copy link
Member

@HippoBaro -- Our team is tracking this item internally - which includes a wider analysis and review with other implementations of gRPC.

However, this item could not be assigned this quarter with the other high priority projects in the pipeline. Not sure if there is much to do until we pick this up. We will keep you updated.

@PapaCharlie
Copy link
Contributor

Hey @arvindbr8, I'm following up on this since it's the end of the quarter. What are the odds this issue will be picked up this coming quarter? We'd see some pretty significant performance improvements on our end if this became a reality. Thanks in advance!

@dfawley
Copy link
Member

dfawley commented Mar 15, 2024

We might be doing some work on codecs next quarter depending on internal priorities. If so, the most likely change would be to use a scatter-gather API that references the transport's buffers directly and takes ownership of the buffers. It would probably also want to have a way to indicate that the buffers are no longer needed, as we already re-use these buffers. The two would enable "zero"-copy in gRPC (excluding the Go std HTTP/2 framer which doesn't support it) which should improve performance on data heavy workloads for codecs that can take advantage of it (proto doesn't).

@PapaCharlie
Copy link
Contributor

OK makes sense. Is there a way for us to provide any help here? For our project, we'd really like to avoid running off of a fork but we're seeing memory allocation rates on the order of multiple gigabytes per second... We even had to set up some rate limits around actually generating/sending responses to prevent the process from OOMing. We'd save a pretty significant amount of hardware with this.

@dfawley
Copy link
Member

dfawley commented Mar 15, 2024

Apologies - now that I looked more closely at the proposal itself, I noticed that you're looking at the sending of data (Marshal), whereas my comment was mostly about the receiving side (Unmarshal).

For the encoding.Codec API (both sides), generally I'd prefer to have one style of API for recycling the buffer(s).

In both directions, the buffers being passed around might want a method on them to enable the side that produced them to re-use them. The API needs to support buffer ownership transfer, re-use, and scatter-gather, even if any given codec (including protobuf) doesn't support those features.

Maybe something like this?

type Codec interface {
	Marshal(msg any) (Buffer, error)
	Unmarshal(buf Buffer, msg any) error
}

type Buffer interface {
	Data() [][]byte
	// Free is called by the consumer of the buffer when it is done using the data buffers.
	// The producer of the buffer may then reuse it.
	Free()
}

It would be interesting to brainstorm other ideas and see which one is the most ergonomic. If you're able to do that and also implement the solution that we find will be best, then that would be extremely helpful.

@PapaCharlie
Copy link
Contributor

Funny you should mention that! I did do that: #6608. It's still rough, but it basically just extends the existing Codec interface to include MarshalAppend, which invokes the corresponding method in google.golang.org/protobuf/proto. This way it invokes the existing methods out of the box without explicit wiring, other than specifying the buffer pool.

The author of this issue also provided his own thoughts/approach in #6613. I think the right answer probably lies somewhere in between.

I've spent a decent amount of time looking into this issue and how it should be solved, so I'm very happy to help brainstorm. I however haven't spent much time actually contributing to gRPC-go so I don't want to presume my approach really matches the existing style/standards in the code.

Maybe it'd be worth having an actual meeting to talk about this? I understand that your team has a lot on its plate and may not have time to address this, but this is a big enough problem for us at LinkedIn that my team would be happy to allocate engineer power to tackle this. All you need to do is point us in the right direction!

@PapaCharlie
Copy link
Contributor

So, after my meeting with @dfawley, we arrived at a proposal. The Codec interface can be upgraded to something like the following:

type Codec interface {
	Marshal(v any) (length int, seq iter.Seq2[[]byte, error])
	ReturnBuffer([]byte)

	GetBuffer(len int) []byte
	Unmarshal(v any, length int, seq iter.Seq2[[]byte, error]) error
}

(this could be done by having a new google.golang.org/grpc/encoding/v2 package)

The point of this new interface is to allow zero-copy codecs to exist, and allowing those codecs to (optionally) reuse buffers. In this case, upgrading the existing gRPC codec to this interface is trivial. Both ReturnBuffer and GetBuffer could be made optional to keep the interface simple. The server implementation can iterate over the returned sequence of buffers without ever fully materializing the entire buffer.

The iter.Seq2[[]byte, error] type doesn't exist for older versions of go but is simply func(yield func([]byte, error) bool), and can be upgraded when the oldest supported version of grpc-go is go1.22.

We want to open the discussion on this before jumping in head first, please let us know what you think!

@vmg
Copy link
Contributor

vmg commented Mar 22, 2024

This works for us at Vitess. We're very interested on having a scatter-gather kind of API, particularly for serialization. Just thumbs up overall. Anything I can do to help with the implementation and/or early testing?

@PapaCharlie
Copy link
Contributor

Hey @vmg good to hear! Can you give me some more insight on how you'd use this? Just trying to get a better feel for what should be prioritized first. There's some interesting implications with what I've proposed. For example, some of the public APIs may have to be changed to accept the new iter.Seq2[[]byte, error] type, among other things

@jzelinskie
Copy link

jzelinskie commented Mar 31, 2024

We're also interested in this for SpiceDB. Most of our allocations are coming from gRPC messages, so anything to reuse those helps our overall scalability and latency. I'm not sure we have strong opinions on the hook to provide the data slice, just that the slices can be reused. A simple sync.Pool implementation would be heaps (pun intended) of help.

EDIT: FWIW, SpiceDB also uses vtprotobuf (as @vmg mentions below).

@vmg
Copy link
Contributor

vmg commented Apr 2, 2024

@PapaCharlie: for Vitess, we happen to often work with very large packets coming from the MySQL binary protocol which we must make available through the distributed system via GRPC. Although the default ProtoBuf Go library does not support incremental/chunked serialization, I maintain an alternative implementation (https://github.com/planetscale/vtprotobuf/) that Vitess and many other OSS projects use. Being able to serialize as an iter.Seq2[[]byte, error] would be a game changer for these large ProtoBuf messages that contain binary blobs inside of them.

PapaCharlie added a commit to PapaCharlie/grpc-go that referenced this issue Apr 9, 2024
This new codec, as outlined in grpc#6619 will
allow the reuse of buffers to their fullest extent. Note that this deliberately
does not (yet) implement support for stathandlers, however all the relevant APIs
have been updated and both old and new Codec implementations are supported.
PapaCharlie added a commit to PapaCharlie/grpc-go that referenced this issue Apr 10, 2024
This new codec, as outlined in grpc#6619 will allow the reuse of buffers to their fullest extent. Note that this deliberately does not (yet) implement support for stathandlers, however all the relevant APIs have been updated and both old and new Codec implementations are supported.
@PapaCharlie
Copy link
Contributor

Hey everyone, circling back on this after actually implementing a lot of this. Here's the interface that we landed on:

package encoding

type CodecV2 interface {
	Marshal(v any) (out mem.BufferSlice, err error)
	Unmarshal(data mem.BufferSlice, v any) error
	Name() string
}

...

package mem

type BufferSlice []*Buffer

type Buffer struct {
	data []byte
	refs *atomic.Int32
	free func([]byte)
}

These Buffers will be used to read off of the wire, and come from a shared buffer pool. Once the CodecV2 is done with the buffers given to it by Unmarshal, it can call Free to return it back to the server's pool. It can hold on onto those buffers as long as it wants (e.g. to reference them directly instead of making a direct copy). Conversely, the Buffers coming out of Marshal can be sampled from the same backing pool if needed (there are some improved implementations of buffer pools in the mem package). Let us know what you think and if this works for you!

@PapaCharlie
Copy link
Contributor

PapaCharlie commented Apr 26, 2024

Hey @vmg, what are your thoughts on this? Got some thumbs up from the other folks on this issue but since you maintain vtprotobuf, your code generator will likely be able to make the best use of this

@vmg
Copy link
Contributor

vmg commented Apr 27, 2024

This seems sensible to me! To be fair, there may be a sharp corner hidden here which is not obvious by looking at the API. My homeboy @frouioui is going to try your CodecV2 branch next week to attempt a Vitess optimization which we've been planning for a while. I think he'll have more meaningful feedback very soon.

@GiedriusS
Copy link

GiedriusS commented May 31, 2024

@vmg maybe there are some updates? We are also very interested in this (https://github.com/thanos-io/thanos).

@jzelinskie
Copy link

jzelinskie commented Aug 22, 2024

I took a stab at implementing a vtprotobuf Codec V2 now that #7356 was merged.

Does this look right? authzed/spicedb@main...jzelinskie:spicedb:grpc-codecv2

@vmg
Copy link
Contributor

vmg commented Aug 23, 2024

@jzelinskie: I'm not quite sure! Could you please open a PR on the upstream repository and we can review and discuss it?

@coxley
Copy link
Contributor

coxley commented Aug 29, 2024

Are there benchmarks for the buffer recycling available?

I'm consistently getting better results with the original codec. I can't tell if my tests are subtly wrong, or if using go test -bench is hard to accurately assess pooling.

Full code in gist: https://gist.github.com/coxley/3c3eab255f5a34cf43c33491f3c21703

With CodecV2:

goos: darwin
goarch: arm64
pkg: github.com/TriggerMail/go-common/grpc/codec
cpu: Apple M1 Pro
BenchmarkCodec/vtproto/sm-10            10056465               109.5 ns/op           136 B/op          5 allocs/op
BenchmarkCodec/vtproto/md-10             1217408               981.4 ns/op          7367 B/op          7 allocs/op
BenchmarkCodec/vtproto/lg-10              200335              5954 ns/op           65814 B/op          7 allocs/op
BenchmarkCodec/vanilla/sm-10             5247548               230.3 ns/op           120 B/op          5 allocs/op
BenchmarkCodec/vanilla/md-10              715630              1600 ns/op            7350 B/op          7 allocs/op
BenchmarkCodec/vanilla/lg-10              109792             11488 ns/op           65806 B/op          7 allocs/op
PASS
ok      github.com/TriggerMail/go-common/grpc/codec     9.382s

# With 3 internal cycles within each bench run, seeing if helps avoid GC clears between runs
➜ go test -bench=. -benchmem -run=Benchmark
goos: darwin
goarch: arm64
pkg: github.com/TriggerMail/go-common/grpc/codec
cpu: Apple M1 Pro
BenchmarkCodec/vtproto/sm-10             3449646               331.6 ns/op           408 B/op         15 allocs/op
BenchmarkCodec/vtproto/md-10              377755              5488 ns/op           22099 B/op         21 allocs/op
BenchmarkCodec/vtproto/lg-10               60810             21783 ns/op          197443 B/op         21 allocs/op
BenchmarkCodec/vanilla/sm-10             1732674               693.8 ns/op           360 B/op         15 allocs/op
BenchmarkCodec/vanilla/md-10              235479              4828 ns/op           22052 B/op         21 allocs/op
BenchmarkCodec/vanilla/lg-10               37563             33797 ns/op          197418 B/op         21 allocs/op
PASS
ok      github.com/TriggerMail/go-common/grpc/codec     10.418s

With Old:

➜ go test -bench=. -benchmem -run=Benchmark
goos: darwin
goarch: arm64
pkg: github.com/TriggerMail/go-common/grpc/codec
cpu: Apple M1 Pro
BenchmarkCodec/vtproto/sm-10            20209755                59.47 ns/op           96 B/op          3 allocs/op
BenchmarkCodec/vtproto/md-10             2388817               592.6 ns/op          4432 B/op          3 allocs/op
BenchmarkCodec/vtproto/lg-10              218416              5324 ns/op           65616 B/op          3 allocs/op
BenchmarkCodec/vanilla/sm-10             8624634               139.6 ns/op            80 B/op          3 allocs/op
BenchmarkCodec/vanilla/md-10             1295725               902.0 ns/op          4416 B/op          3 allocs/op
BenchmarkCodec/vanilla/lg-10              116025             10272 ns/op           65600 B/op          3 allocs/op
PASS
ok      github.com/TriggerMail/go-common/grpc/codec     11.756s

# With 3 internal cycles within each bench run, seeing if helps avoid GC clears between runs
➜ go test -bench=. -benchmem -run=Benchmark
goos: darwin
goarch: arm64
pkg: github.com/TriggerMail/go-common/grpc/codec
cpu: Apple M1 Pro
BenchmarkCodec/vtproto/sm-10             6015910               184.0 ns/op           288 B/op          9 allocs/op
BenchmarkCodec/vtproto/md-10              660069              1721 ns/op           13296 B/op          9 allocs/op
BenchmarkCodec/vtproto/lg-10               74308             16128 ns/op          196848 B/op          9 allocs/op
BenchmarkCodec/vanilla/sm-10             2839554               421.6 ns/op           240 B/op          9 allocs/op
BenchmarkCodec/vanilla/md-10              415316              2700 ns/op           13248 B/op          9 allocs/op
BenchmarkCodec/vanilla/lg-10               40531             29349 ns/op          196800 B/op          9 allocs/op
PASS
ok      github.com/TriggerMail/go-common/grpc/codec     8.772s

@coxley
Copy link
Contributor

coxley commented Aug 29, 2024

Aha, found it. There's an edge case when len(mem.BufferSlice) == 1 that causes values to never get sent back to the pool. Will submit a PR.

I spoke a little soon, this is expected. But something seems off. The benchmark is similarly inflated in new vs. old when using bufconn end-to-end.

@coxley
Copy link
Contributor

coxley commented Aug 29, 2024

Addressed in #7571

@eshitachandwani eshitachandwani added the Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. label Sep 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: RPC Features Includes Compression, Encoding, Attributes/Metadata, Interceptors. P2 Type: Feature New features or improvements in behavior
Projects
None yet
Development

Successfully merging a pull request may close this issue.