Skip to content

client: export types implementing CallOptions for access by interceptors #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
//
// All errors returned by Invoke are compatible with the status package.
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = append(cc.dopts.callOptions, opts...)
Copy link

@euroelessar euroelessar Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhump @dfawley this looks like data race in case of concurrent access with non-empty opts ...CallOptions as they override the same elements in cc.dopts.callOptions (given sufficient capacity)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! See #1948


if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
Expand Down
198 changes: 143 additions & 55 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,46 +160,66 @@ type EmptyCallOption struct{}
func (EmptyCallOption) before(*callInfo) error { return nil }
func (EmptyCallOption) after(*callInfo) {}

type beforeCall func(c *callInfo) error

func (o beforeCall) before(c *callInfo) error { return o(c) }
func (o beforeCall) after(c *callInfo) {}

type afterCall func(c *callInfo)

func (o afterCall) before(c *callInfo) error { return nil }
func (o afterCall) after(c *callInfo) { o(c) }

// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
return afterCall(func(c *callInfo) {
if c.stream != nil {
*md, _ = c.stream.Header()
}
})
return HeaderCallOption{HeaderAddr: md}
}

// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type HeaderCallOption struct {
HeaderAddr *metadata.MD
}

func (o HeaderCallOption) before(c *callInfo) error { return nil }
func (o HeaderCallOption) after(c *callInfo) {
if c.stream != nil {
*o.HeaderAddr, _ = c.stream.Header()
}
}

// Trailer returns a CallOptions that retrieves the trailer metadata
// for a unary RPC.
func Trailer(md *metadata.MD) CallOption {
return afterCall(func(c *callInfo) {
if c.stream != nil {
*md = c.stream.Trailer()
}
})
return TrailerCallOption{TrailerAddr: md}
}

// TrailerCallOption is a CallOption for collecting response trailer metadata.
// The metadata field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type TrailerCallOption struct {
TrailerAddr *metadata.MD
}

func (o TrailerCallOption) before(c *callInfo) error { return nil }
func (o TrailerCallOption) after(c *callInfo) {
if c.stream != nil {
*o.TrailerAddr = c.stream.Trailer()
}
}

// Peer returns a CallOption that retrieves peer information for a
// unary RPC.
func Peer(p *peer.Peer) CallOption {
return afterCall(func(c *callInfo) {
if c.stream != nil {
if x, ok := peer.FromContext(c.stream.Context()); ok {
*p = *x
}
return PeerCallOption{PeerAddr: p}
}

// PeerCallOption is a CallOption for collecting the identity of the remote
// peer. The peer field will be populated *after* the RPC completes.
// This is an EXPERIMENTAL API.
type PeerCallOption struct {
PeerAddr *peer.Peer
}

func (o PeerCallOption) before(c *callInfo) error { return nil }
func (o PeerCallOption) after(c *callInfo) {
if c.stream != nil {
if x, ok := peer.FromContext(c.stream.Context()); ok {
*o.PeerAddr = *x
}
})
}
}

// FailFast configures the action to take when an RPC is attempted on broken
Expand All @@ -213,48 +233,97 @@ func Peer(p *peer.Peer) CallOption {
//
// By default, RPCs are "Fail Fast".
func FailFast(failFast bool) CallOption {
return beforeCall(func(c *callInfo) error {
c.failFast = failFast
return nil
})
return FailFastCallOption{FailFast: failFast}
}

// FailFastCallOption is a CallOption for indicating whether an RPC should fail
// fast or not.
// This is an EXPERIMENTAL API.
type FailFastCallOption struct {
FailFast bool
}

func (o FailFastCallOption) before(c *callInfo) error {
c.failFast = o.FailFast
return nil
}
func (o FailFastCallOption) after(c *callInfo) { return }

// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
func MaxCallRecvMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxReceiveMessageSize = &s
return nil
})
return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s}
}

// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
// size the client can receive.
// This is an EXPERIMENTAL API.
type MaxRecvMsgSizeCallOption struct {
MaxRecvMsgSize int
}

func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
c.maxReceiveMessageSize = &o.MaxRecvMsgSize
return nil
}
func (o MaxRecvMsgSizeCallOption) after(c *callInfo) { return }

// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
func MaxCallSendMsgSize(s int) CallOption {
return beforeCall(func(o *callInfo) error {
o.maxSendMessageSize = &s
return nil
})
return MaxSendMsgSizeCallOption{MaxSendMsgSize: s}
}

// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
// size the client can send.
// This is an EXPERIMENTAL API.
type MaxSendMsgSizeCallOption struct {
MaxSendMsgSize int
}

func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
c.maxSendMessageSize = &o.MaxSendMsgSize
return nil
}
func (o MaxSendMsgSizeCallOption) after(c *callInfo) { return }

// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
return beforeCall(func(c *callInfo) error {
c.creds = creds
return nil
})
return PerRPCCredsCallOption{Creds: creds}
}

// PerRPCCredsCallOption is a CallOption that indicates the the per-RPC
// credentials to use for the call.
// This is an EXPERIMENTAL API.
type PerRPCCredsCallOption struct {
Creds credentials.PerRPCCredentials
}

func (o PerRPCCredsCallOption) before(c *callInfo) error {
c.creds = o.Creds
return nil
}
func (o PerRPCCredsCallOption) after(c *callInfo) { return }

// UseCompressor returns a CallOption which sets the compressor used when
// sending the request. If WithCompressor is also set, UseCompressor has
// higher priority.
//
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If WithCompressor is also set, UseCompressor has higher priority.

This raises an interesting point... we have some DialOptions that also can alter per-RPC behavior. Especially WithDefaultCallOptions.

To do this right, we should make sure there are CallOption equivalents for every appropriate DialOption, express them in terms of default CallOptions instead, and then pass those defaults along with the per-call CallOptions to the interceptors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other dial options (WithMaxMsgSize and WithCodec) use WithDefaultCallOptions under the hood. So exposing these all to the interceptor turned out to be pretty simple.

// This API is EXPERIMENTAL.
func UseCompressor(name string) CallOption {
return beforeCall(func(c *callInfo) error {
c.compressorType = name
return nil
})
return CompressorCallOption{CompressorType: name}
}

// CompressorCallOption is a CallOption that indicates the compressor to use.
// This is an EXPERIMENTAL API.
type CompressorCallOption struct {
CompressorType string
}

func (o CompressorCallOption) before(c *callInfo) error {
c.compressorType = o.CompressorType
return nil
}
func (o CompressorCallOption) after(c *callInfo) { return }

// CallContentSubtype returns a CallOption that will set the content-subtype
// for a call. For example, if content-subtype is "json", the Content-Type over
Expand All @@ -273,12 +342,21 @@ func UseCompressor(name string) CallOption {
// response messages, with the content-subtype set to the given contentSubtype
// here for requests.
func CallContentSubtype(contentSubtype string) CallOption {
contentSubtype = strings.ToLower(contentSubtype)
return beforeCall(func(c *callInfo) error {
c.contentSubtype = contentSubtype
return nil
})
return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
}

// ContentSubtypeCallOption is a CallOption that indicates the content-subtype
// used for marshaling messages.
// This is an EXPERIMENTAL API.
type ContentSubtypeCallOption struct {
ContentSubtype string
}

func (o ContentSubtypeCallOption) before(c *callInfo) error {
c.contentSubtype = o.ContentSubtype
return nil
}
func (o ContentSubtypeCallOption) after(c *callInfo) { return }

// CallCustomCodec returns a CallOption that will set the given Codec to be
// used for all request and response messages for a call. The result of calling
Expand All @@ -293,11 +371,21 @@ func CallContentSubtype(contentSubtype string) CallOption {
// This function is provided for advanced users; prefer to use only
// CallContentSubtype to select a registered codec instead.
func CallCustomCodec(codec Codec) CallOption {
return beforeCall(func(c *callInfo) error {
c.codec = codec
return nil
})
return CustomCodecCallOption{Codec: codec}
}

// CustomCodecCallOption is a CallOption that indicates the codec used for
// marshaling messages.
// This is an EXPERIMENTAL API.
type CustomCodecCallOption struct {
Codec Codec
}

func (o CustomCodecCallOption) before(c *callInfo) error {
c.codec = o.Codec
return nil
}
func (o CustomCodecCallOption) after(c *callInfo) { return }

// The format of the payload: compressed or not?
type payloadFormat uint8
Expand Down
5 changes: 4 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type ClientStream interface {
// NewStream creates a new Stream for the client side. This is typically
// called by generated code.
func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = append(cc.dopts.callOptions, opts...)

if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
}
Expand Down Expand Up @@ -137,7 +141,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}()

opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
Expand Down
Loading