From 55cd915b9b9dbf78079bd1f88a49acde3fc53172 Mon Sep 17 00:00:00 2001 From: Ben B Date: Mon, 8 Aug 2022 17:02:49 +0200 Subject: [PATCH] [extension/jaegerremotesampling] remote sampling gRPC support #6694 (#12788) * jaegerremotesampling: set transport in grpc default config Signed-off-by: Benedikt Bongartz * jaegerremotesampling: first grpc server draft Signed-off-by: Benedikt Bongartz * jaegerremotesampling: register grpc server Signed-off-by: Benedikt Bongartz * unreleased: add changelog Signed-off-by: Benedikt Bongartz --- extension/jaegerremotesampling/config_test.go | 10 +- extension/jaegerremotesampling/extension.go | 31 ++++- extension/jaegerremotesampling/factory.go | 3 +- .../jaegerremotesampling/factory_test.go | 5 +- .../jaegerremotesampling/internal/grpc.go | 120 ++++++++++++++++++ .../internal/grpc_test.go | 103 +++++++++++++++ .../internal/http_test.go | 18 +-- .../internal/internal_test.go | 32 +++++ ...er_remote_sampling_grpc_support_#6694.yaml | 11 ++ 9 files changed, 307 insertions(+), 26 deletions(-) create mode 100644 extension/jaegerremotesampling/internal/grpc.go create mode 100644 extension/jaegerremotesampling/internal/grpc_test.go create mode 100644 extension/jaegerremotesampling/internal/internal_test.go create mode 100755 unreleased/feat_jaeger_remote_sampling_grpc_support_#6694.yaml diff --git a/extension/jaegerremotesampling/config_test.go b/extension/jaegerremotesampling/config_test.go index 05de408ed69e..e89956fe768c 100644 --- a/extension/jaegerremotesampling/config_test.go +++ b/extension/jaegerremotesampling/config_test.go @@ -45,7 +45,10 @@ func TestLoadConfig(t *testing.T) { &Config{ ExtensionSettings: config.NewExtensionSettings(config.NewComponentID(typeStr)), HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"}, - GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}}, + GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{ + Endpoint: ":14250", + Transport: "tcp", + }}, Source: Source{ Remote: &configgrpc.GRPCClientSettings{ Endpoint: "jaeger-collector:14250", @@ -59,7 +62,10 @@ func TestLoadConfig(t *testing.T) { &Config{ ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "1")), HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"}, - GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}}, + GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{ + Endpoint: ":14250", + Transport: "tcp", + }}, Source: Source{ ReloadInterval: time.Second, File: "/etc/otelcol/sampling_strategies.json", diff --git a/extension/jaegerremotesampling/extension.go b/extension/jaegerremotesampling/extension.go index ed6069900f8f..5595575c8f18 100644 --- a/extension/jaegerremotesampling/extension.go +++ b/extension/jaegerremotesampling/extension.go @@ -35,6 +35,7 @@ type jrsExtension struct { telemetry component.TelemetrySettings httpServer component.Component + grpcServer component.Component samplingStore strategystore.StrategyStore closers []func() error @@ -91,12 +92,22 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error return fmt.Errorf("error while creating the HTTP server: %w", err) } jrse.httpServer = httpServer + // then we start our own server interfaces, starting with the HTTP one + if err := jrse.httpServer.Start(ctx, host); err != nil { + return fmt.Errorf("error while starting the HTTP server: %w", err) + } } - // then we start our own server interfaces, starting with the HTTP one - err := jrse.httpServer.Start(ctx, host) - if err != nil { - return fmt.Errorf("error while starting the HTTP server: %w", err) + if jrse.cfg.GRPCServerSettings != nil { + grpcServer, err := internal.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerSettings, jrse.samplingStore) + if err != nil { + return fmt.Errorf("error while creating the gRPC server: %w", err) + } + jrse.grpcServer = grpcServer + // start our gRPC server interface + if err := jrse.grpcServer.Start(ctx, host); err != nil { + return fmt.Errorf("error while starting the gRPC server: %w", err) + } } return nil @@ -104,8 +115,16 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error func (jrse *jrsExtension) Shutdown(ctx context.Context) error { // we probably don't want to break whenever an error occurs, we want to continue and close the other resources - if err := jrse.httpServer.Shutdown(ctx); err != nil { - jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err)) + if jrse.httpServer != nil { + if err := jrse.httpServer.Shutdown(ctx); err != nil { + jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err)) + } + } + + if jrse.grpcServer != nil { + if err := jrse.grpcServer.Shutdown(ctx); err != nil { + jrse.telemetry.Logger.Error("error while shutting down the gRPC server", zap.Error(err)) + } } for _, closer := range jrse.closers { diff --git a/extension/jaegerremotesampling/factory.go b/extension/jaegerremotesampling/factory.go index dbc6d3a96c15..fe70c9b801f4 100644 --- a/extension/jaegerremotesampling/factory.go +++ b/extension/jaegerremotesampling/factory.go @@ -45,7 +45,8 @@ func createDefaultConfig() config.Extension { }, GRPCServerSettings: &configgrpc.GRPCServerSettings{ NetAddr: confignet.NetAddr{ - Endpoint: ":14250", + Endpoint: ":14250", + Transport: "tcp", }, }, Source: Source{}, diff --git a/extension/jaegerremotesampling/factory_test.go b/extension/jaegerremotesampling/factory_test.go index 645427199872..e9aadd4e45af 100644 --- a/extension/jaegerremotesampling/factory_test.go +++ b/extension/jaegerremotesampling/factory_test.go @@ -32,7 +32,10 @@ func TestCreateDefaultConfig(t *testing.T) { expected := &Config{ ExtensionSettings: config.NewExtensionSettings(config.NewComponentID(typeStr)), HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"}, - GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}}, + GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{ + Endpoint: ":14250", + Transport: "tcp", + }}, } // test diff --git a/extension/jaegerremotesampling/internal/grpc.go b/extension/jaegerremotesampling/internal/grpc.go new file mode 100644 index 000000000000..03449eeffa0e --- /dev/null +++ b/extension/jaegerremotesampling/internal/grpc.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal" + +import ( + "context" + "errors" + "fmt" + "net" + + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" + "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/reflection" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" +) + +var _ component.Component = (*SamplingGRPCServer)(nil) + +var errGRPCServerNotRunning = errors.New("gRPC server is not running") + +type grpcServer interface { + Serve(lis net.Listener) error + GracefulStop() + Stop() +} + +// NewGRPC returns a new sampling gRPC Server. +func NewGRPC( + telemetry component.TelemetrySettings, + settings configgrpc.GRPCServerSettings, + strategyStore strategystore.StrategyStore, +) (*SamplingGRPCServer, error) { + if strategyStore == nil { + return nil, errMissingStrategyStore + } + + return &SamplingGRPCServer{ + telemetry: telemetry, + settings: settings, + strategyStore: strategyStore, + }, nil +} + +// SamplingGRPCServer implements component.Component to make the life cycle easy to manage. +type SamplingGRPCServer struct { + telemetry component.TelemetrySettings + settings configgrpc.GRPCServerSettings + strategyStore strategystore.StrategyStore + + grpcServer grpcServer +} + +func (s *SamplingGRPCServer) Start(_ context.Context, host component.Host) error { + opts, err := s.settings.ToServerOption(host, s.telemetry) + if err != nil { + return err + } + server := grpc.NewServer(opts...) + reflection.Register(server) + s.grpcServer = server + + api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(s.strategyStore)) + + healthServer := health.NewServer() + healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) + grpc_health_v1.RegisterHealthServer(server, healthServer) + + listener, err := s.settings.ToListener() + if err != nil { + return fmt.Errorf("failed to listen on gRPC port: %w", err) + } + + go func() { + if err := s.grpcServer.Serve(listener); err != nil { + s.telemetry.Logger.Error("could not launch gRPC service", zap.Error(err)) + } + }() + + return nil +} + +// Shutdown tries to terminate connections gracefully as long as the passed context is valid. +func (s *SamplingGRPCServer) Shutdown(ctx context.Context) error { + if s.grpcServer == nil { + return errGRPCServerNotRunning + } + + ch := make(chan struct{}) + go func() { + s.grpcServer.GracefulStop() + ch <- struct{}{} + }() + + select { + case <-ctx.Done(): + s.grpcServer.Stop() + case <-ch: + } + + return nil +} diff --git a/extension/jaegerremotesampling/internal/grpc_test.go b/extension/jaegerremotesampling/internal/grpc_test.go new file mode 100644 index 000000000000..ae6fe78f7d91 --- /dev/null +++ b/extension/jaegerremotesampling/internal/grpc_test.go @@ -0,0 +1,103 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" +) + +func TestMissingClientConfigManagerGRPC(t *testing.T) { + s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), configgrpc.GRPCServerSettings{}, nil) + assert.Equal(t, errMissingStrategyStore, err) + assert.Nil(t, s) +} + +func TestStartAndStopGRPC(t *testing.T) { + // prepare + srvSettings := configgrpc.GRPCServerSettings{ + NetAddr: confignet.NetAddr{ + Endpoint: ":0", + Transport: "tcp", + }, + } + s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), srvSettings, &mockCfgMgr{}) + require.NoError(t, err) + require.NotNil(t, s) + + // test + assert.NoError(t, s.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, s.Shutdown(context.Background())) +} + +func TestSamplingGRPCServer_Shutdown(t *testing.T) { + tt := []struct { + name string + grpcServer grpcServer + timeout time.Duration + expect error + }{ + { + name: "graceful stop is successful without delay", + grpcServer: &grpcServerMock{}, + timeout: time.Minute, + }, + { + name: "graceful stop is successful with delay", + grpcServer: &grpcServerMock{ + timeToGracefulStop: 5 * time.Second, + }, + timeout: time.Minute, + }, + { + name: "context timed out", + grpcServer: &grpcServerMock{ + timeToGracefulStop: time.Minute, + }, + timeout: 5 * time.Second, + }, + { + name: "grpc server not started", + timeout: time.Minute, + expect: errGRPCServerNotRunning, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + srv := &SamplingGRPCServer{grpcServer: tc.grpcServer} + ctx, cancel := context.WithTimeout(context.Background(), tc.timeout) + defer cancel() + assert.Equal(t, tc.expect, srv.Shutdown(ctx)) + }) + } +} + +type grpcServerMock struct { + timeToGracefulStop time.Duration +} + +func (g *grpcServerMock) Serve(lis net.Listener) error { return nil } +func (g *grpcServerMock) Stop() {} +func (g *grpcServerMock) GracefulStop() { time.Sleep(g.timeToGracefulStop) } diff --git a/extension/jaegerremotesampling/internal/http_test.go b/extension/jaegerremotesampling/internal/http_test.go index 0a9107b50526..b6dae877f4fc 100644 --- a/extension/jaegerremotesampling/internal/http_test.go +++ b/extension/jaegerremotesampling/internal/http_test.go @@ -31,16 +31,13 @@ import ( "go.opentelemetry.io/collector/config/confighttp" ) -func TestMissingClientConfigManager(t *testing.T) { - // test +func TestMissingClientConfigManagerHTTP(t *testing.T) { s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.HTTPServerSettings{}, nil) - - // verify assert.Equal(t, errMissingStrategyStore, err) assert.Nil(t, s) } -func TestStartAndStop(t *testing.T) { +func TestStartAndStopHTTP(t *testing.T) { // prepare srvSettings := confighttp.HTTPServerSettings{ Endpoint: ":0", @@ -139,14 +136,3 @@ func TestErrorFromClientConfigManager(t *testing.T) { body, _ := io.ReadAll(rw.Body) assert.Contains(t, string(body), "failed to get sampling strategy for service") } - -type mockCfgMgr struct { - getSamplingStrategyFunc func(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) -} - -func (m *mockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) { - if m.getSamplingStrategyFunc != nil { - return m.getSamplingStrategyFunc(ctx, serviceName) - } - return sampling.NewSamplingStrategyResponse(), nil -} diff --git a/extension/jaegerremotesampling/internal/internal_test.go b/extension/jaegerremotesampling/internal/internal_test.go new file mode 100644 index 000000000000..e29edcc9649f --- /dev/null +++ b/extension/jaegerremotesampling/internal/internal_test.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + "github.com/jaegertracing/jaeger/thrift-gen/sampling" +) + +type mockCfgMgr struct { + getSamplingStrategyFunc func(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) +} + +func (m *mockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) { + if m.getSamplingStrategyFunc != nil { + return m.getSamplingStrategyFunc(ctx, serviceName) + } + return sampling.NewSamplingStrategyResponse(), nil +} diff --git a/unreleased/feat_jaeger_remote_sampling_grpc_support_#6694.yaml b/unreleased/feat_jaeger_remote_sampling_grpc_support_#6694.yaml new file mode 100755 index 000000000000..f387cc36fe5f --- /dev/null +++ b/unreleased/feat_jaeger_remote_sampling_grpc_support_#6694.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: jaegerremotesampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: extend jaegerremotesampling with remote sampling via gRPC + +# One or more tracking issues related to the change +issues: [6694]