Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[configgrpc] wrap gRPC client/server options in extensible interface #11069

Merged
28 changes: 28 additions & 0 deletions .chloggen/9480-configgrpc-option-wrapper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'deprecation'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: configgrpc

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Deprecate `ClientConfig.ToClientConn`/`ServerConfig.ToServer` in favor of `ToClientConnWithOptions`/`ToServerWithOptions`"

# One or more tracking issues or pull requests related to the change
issues: [9480]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Users providing a grpc.DialOption/grpc.ServerOption should now wrap them into
a generic option with `WithGrpcDialOption`/`WithGrpcServerOption`.


# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
117 changes: 106 additions & 11 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,58 @@ func (gcs *ClientConfig) isSchemeHTTPS() bool {
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use grpc.WithBlock() dial option.
func (gcs *ClientConfig) ToClientConn(ctx context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts, err := gcs.toDialOptions(ctx, host, settings)
//
// Deprecated: [v0.110.0] If providing a [grpc.DialOption], use [ClientConfig.ToClientConnWithOptions]
// with [WithGrpcDialOption] instead.
func (gcs *ClientConfig) ToClientConn(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
grpcOpts ...grpc.DialOption,
) (*grpc.ClientConn, error) {
var extraOpts []ToClientConnOption
for _, grpcOpt := range grpcOpts {
extraOpts = append(extraOpts, WithGrpcDialOption(grpcOpt))
}
return gcs.ToClientConnWithOptions(ctx, host, settings, extraOpts...)
}

// ToClientConnOption is a sealed interface wrapping options for [ClientConfig.ToClientConnWithOptions].
type ToClientConnOption interface {
jade-guiton-dd marked this conversation as resolved.
Show resolved Hide resolved
isToClientConnOption()
}

type grpcDialOptionWrapper struct {
opt grpc.DialOption
}

// WithGrpcDialOption wraps a [grpc.DialOption] into a [ToClientConnOption].
func WithGrpcDialOption(opt grpc.DialOption) ToClientConnOption {
jade-guiton-dd marked this conversation as resolved.
Show resolved Hide resolved
return grpcDialOptionWrapper{opt: opt}
}
func (grpcDialOptionWrapper) isToClientConnOption() {}

// ToClientConnWithOptions is the same as [ClientConfig.ToClientConn], but uses the [ToClientConnOption] interface for options.
// This method will eventually replace [ClientConfig.ToClientConn].
func (gcs *ClientConfig) ToClientConnWithOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToClientConnOption,
) (*grpc.ClientConn, error) {
grpcOpts, err := gcs.getGrpcDialOptions(ctx, host, settings, extraOpts)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.NewClient(gcs.sanitizedEndpoint(), opts...)
return grpc.NewClient(gcs.sanitizedEndpoint(), grpcOpts...)
}

func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
func (gcs *ClientConfig) getGrpcDialOptions(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToClientConnOption,
) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression.IsCompressed() {
cp, err := getGRPCCompressionName(gcs.Compression)
Expand Down Expand Up @@ -312,6 +354,12 @@ func (gcs *ClientConfig) toDialOptions(ctx context.Context, host component.Host,
// Enable OpenTelemetry observability plugin.
opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelOpts...)))

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcDialOptionWrapper); ok {
opts = append(opts, wrapper.opt)
}
}

return opts, nil
}

Expand All @@ -335,17 +383,58 @@ func (gss *ServerConfig) Validate() error {
return nil
}

// ToServer returns a grpc.Server for the configuration
func (gss *ServerConfig) ToServer(_ context.Context, host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) {
opts, err := gss.toServerOption(host, settings)
// ToServer returns a [grpc.Server] for the configuration
//
// Deprecated: [v0.110.0] If providing a [grpc.ServerOption], use [ServerConfig.ToServerWithOptions]
// with [WithGrpcServerOption] instead.
func (gss *ServerConfig) ToServer(
ctx context.Context,
host component.Host,
settings component.TelemetrySettings,
grpcOpts ...grpc.ServerOption,
) (*grpc.Server, error) {
var extraOpts []ToServerOption
for _, grpcOpt := range grpcOpts {
extraOpts = append(extraOpts, WithGrpcServerOption(grpcOpt))
}
return gss.ToServerWithOptions(ctx, host, settings, extraOpts...)
}

// ToServerOption is a sealed interface wrapping options for [ServerConfig.ToServerWithOptions].
type ToServerOption interface {
jade-guiton-dd marked this conversation as resolved.
Show resolved Hide resolved
isToServerOption()
}

type grpcServerOptionWrapper struct {
opt grpc.ServerOption
}

// WithGrpcServerOption wraps a [grpc.ServerOption] into a [ToServerOption].
func WithGrpcServerOption(opt grpc.ServerOption) ToServerOption {
jade-guiton-dd marked this conversation as resolved.
Show resolved Hide resolved
return grpcServerOptionWrapper{opt: opt}
}
func (grpcServerOptionWrapper) isToServerOption() {}

// ToServerWithOptions is the same as [ServerConfig.ToServer], but uses the [ToServerOption] interface for options.
// This method will eventually replace [ServerConfig.ToServer].
func (gss *ServerConfig) ToServerWithOptions(
_ context.Context,
host component.Host,
settings component.TelemetrySettings,
extraOpts ...ToServerOption,
) (*grpc.Server, error) {
grpcOpts, err := gss.getGrpcServerOptions(host, settings, extraOpts)
if err != nil {
return nil, err
}
opts = append(opts, extraOpts...)
return grpc.NewServer(opts...), nil
return grpc.NewServer(grpcOpts...), nil
}

func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
func (gss *ServerConfig) getGrpcServerOptions(
host component.Host,
settings component.TelemetrySettings,
extraOpts []ToServerOption,
) ([]grpc.ServerOption, error) {
switch gss.NetAddr.Transport {
case confignet.TransportTypeTCP, confignet.TransportTypeTCP4, confignet.TransportTypeTCP6, confignet.TransportTypeUDP, confignet.TransportTypeUDP4, confignet.TransportTypeUDP6:
internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint)
Expand Down Expand Up @@ -435,6 +524,12 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component.

opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

for _, opt := range extraOpts {
if wrapper, ok := opt.(grpcServerOptionWrapper); ok {
opts = append(opts, wrapper.opt)
}
}

return opts, nil
}

Expand Down
51 changes: 45 additions & 6 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,33 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
Insecure: true,
},
}
opts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings())
opts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, opts, 2)
}

func TestGrpcClientExtraOption(t *testing.T) {
tt, err := componenttest.SetupTelemetry(componentID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

gcs := &ClientConfig{
TLSSetting: configtls.ClientConfig{
Insecure: true,
},
}
extraOpt := grpc.WithUserAgent("test-agent")
opts, err := gcs.getGrpcDialOptions(
context.Background(),
componenttest.NewNopHost(),
tt.TelemetrySettings(),
[]ToClientConnOption{WithGrpcDialOption(extraOpt)},
)
require.NoError(t, err)
assert.Len(t, opts, 3)
assert.Equal(t, opts[2], extraOpt)
}

func TestAllGrpcClientSettings(t *testing.T) {
tt, err := componenttest.SetupTelemetry(componentID)
require.NoError(t, err)
Expand Down Expand Up @@ -231,7 +253,7 @@ func TestAllGrpcClientSettings(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
opts, err := test.settings.toDialOptions(context.Background(), test.host, tt.TelemetrySettings())
opts, err := test.settings.getGrpcDialOptions(context.Background(), test.host, tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, opts, 9)
})
Expand All @@ -244,11 +266,28 @@ func TestDefaultGrpcServerSettings(t *testing.T) {
Endpoint: "0.0.0.0:1234",
},
}
opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 3)
}

func TestGrpcServerExtraOption(t *testing.T) {
gss := &ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: "0.0.0.0:1234",
},
}
extraOpt := grpc.ConnectionTimeout(1_000_000_000)
opts, err := gss.getGrpcServerOptions(
componenttest.NewNopHost(),
componenttest.NewNopTelemetrySettings(),
[]ToServerOption{WithGrpcServerOption(extraOpt)},
)
require.NoError(t, err)
assert.Len(t, opts, 4)
assert.Equal(t, opts[3], extraOpt)
}

func TestGrpcServerValidate(t *testing.T) {
tests := []struct {
gss *ServerConfig
Expand Down Expand Up @@ -329,7 +368,7 @@ func TestAllGrpcServerSettingsExceptAuth(t *testing.T) {
},
},
}
opts, err := gss.toServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
opts, err := gss.getGrpcServerOptions(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings(), []ToServerOption{})
require.NoError(t, err)
assert.Len(t, opts, 10)
}
Expand Down Expand Up @@ -488,7 +527,7 @@ func TestUseSecure(t *testing.T) {
TLSSetting: configtls.ClientConfig{},
Keepalive: nil,
}
dialOpts, err := gcs.toDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings())
dialOpts, err := gcs.getGrpcDialOptions(context.Background(), componenttest.NewNopHost(), tt.TelemetrySettings(), []ToClientConnOption{})
require.NoError(t, err)
assert.Len(t, dialOpts, 2)
}
Expand Down Expand Up @@ -540,7 +579,7 @@ func TestGRPCServerWarning(t *testing.T) {
logger, observed := observer.New(zap.DebugLevel)
set.Logger = zap.New(logger)

opts, err := test.settings.toServerOption(componenttest.NewNopHost(), set)
opts, err := test.settings.getGrpcServerOptions(componenttest.NewNopHost(), set, []ToServerOption{})
require.NoError(t, err)
require.NotNil(t, opts)
_ = grpc.NewServer(opts...)
Expand Down
4 changes: 3 additions & 1 deletion exporter/otlpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand Down Expand Up @@ -58,7 +59,8 @@ func newExporter(cfg component.Config, set exporter.Settings) *baseExporter {
// start actually creates the gRPC connection. The client construction is deferred till this point as this
// is the only place we get hold of Extensions which are required to construct auth round tripper.
func (e *baseExporter) start(ctx context.Context, host component.Host) (err error) {
if e.clientConn, err = e.config.ClientConfig.ToClientConn(ctx, host, e.settings, grpc.WithUserAgent(e.userAgent)); err != nil {
agentOpt := configgrpc.WithGrpcDialOption(grpc.WithUserAgent(e.userAgent))
if e.clientConn, err = e.config.ClientConfig.ToClientConnWithOptions(ctx, host, e.settings, agentOpt); err != nil {
return err
}
e.traceExporter = ptraceotlp.NewGRPCClient(e.clientConn)
Expand Down
2 changes: 1 addition & 1 deletion receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *otlpReceiver) startGRPCServer(host component.Host) error {
}

var err error
if r.serverGRPC, err = r.cfg.GRPC.ToServer(context.Background(), host, r.settings.TelemetrySettings); err != nil {
if r.serverGRPC, err = r.cfg.GRPC.ToServerWithOptions(context.Background(), host, r.settings.TelemetrySettings); err != nil {
return err
}

Expand Down
Loading