Skip to content

Commit

Permalink
[extension/jaegerremotesampling] remote sampling gRPC support #6694 (#…
Browse files Browse the repository at this point in the history
…12788)

* jaegerremotesampling: set transport in grpc default config

Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>

* jaegerremotesampling: first grpc server draft

Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>

* jaegerremotesampling: register grpc server

Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>

* unreleased: add changelog

Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>
  • Loading branch information
frzifus committed Aug 8, 2022
1 parent 69a961d commit 55cd915
Show file tree
Hide file tree
Showing 9 changed files with 307 additions and 26 deletions.
10 changes: 8 additions & 2 deletions extension/jaegerremotesampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func TestLoadConfig(t *testing.T) {
&Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentID(typeStr)),
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{
Endpoint: ":14250",
Transport: "tcp",
}},
Source: Source{
Remote: &configgrpc.GRPCClientSettings{
Endpoint: "jaeger-collector:14250",
Expand All @@ -59,7 +62,10 @@ func TestLoadConfig(t *testing.T) {
&Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentIDWithName(typeStr, "1")),
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{
Endpoint: ":14250",
Transport: "tcp",
}},
Source: Source{
ReloadInterval: time.Second,
File: "/etc/otelcol/sampling_strategies.json",
Expand Down
31 changes: 25 additions & 6 deletions extension/jaegerremotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type jrsExtension struct {
telemetry component.TelemetrySettings

httpServer component.Component
grpcServer component.Component
samplingStore strategystore.StrategyStore

closers []func() error
Expand Down Expand Up @@ -91,21 +92,39 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error
return fmt.Errorf("error while creating the HTTP server: %w", err)
}
jrse.httpServer = httpServer
// then we start our own server interfaces, starting with the HTTP one
if err := jrse.httpServer.Start(ctx, host); err != nil {
return fmt.Errorf("error while starting the HTTP server: %w", err)
}
}

// then we start our own server interfaces, starting with the HTTP one
err := jrse.httpServer.Start(ctx, host)
if err != nil {
return fmt.Errorf("error while starting the HTTP server: %w", err)
if jrse.cfg.GRPCServerSettings != nil {
grpcServer, err := internal.NewGRPC(jrse.telemetry, *jrse.cfg.GRPCServerSettings, jrse.samplingStore)
if err != nil {
return fmt.Errorf("error while creating the gRPC server: %w", err)
}
jrse.grpcServer = grpcServer
// start our gRPC server interface
if err := jrse.grpcServer.Start(ctx, host); err != nil {
return fmt.Errorf("error while starting the gRPC server: %w", err)
}
}

return nil
}

func (jrse *jrsExtension) Shutdown(ctx context.Context) error {
// we probably don't want to break whenever an error occurs, we want to continue and close the other resources
if err := jrse.httpServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
if jrse.httpServer != nil {
if err := jrse.httpServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the HTTP server", zap.Error(err))
}
}

if jrse.grpcServer != nil {
if err := jrse.grpcServer.Shutdown(ctx); err != nil {
jrse.telemetry.Logger.Error("error while shutting down the gRPC server", zap.Error(err))
}
}

for _, closer := range jrse.closers {
Expand Down
3 changes: 2 additions & 1 deletion extension/jaegerremotesampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func createDefaultConfig() config.Extension {
},
GRPCServerSettings: &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: ":14250",
Endpoint: ":14250",
Transport: "tcp",
},
},
Source: Source{},
Expand Down
5 changes: 4 additions & 1 deletion extension/jaegerremotesampling/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ func TestCreateDefaultConfig(t *testing.T) {
expected := &Config{
ExtensionSettings: config.NewExtensionSettings(config.NewComponentID(typeStr)),
HTTPServerSettings: &confighttp.HTTPServerSettings{Endpoint: ":5778"},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{Endpoint: ":14250"}},
GRPCServerSettings: &configgrpc.GRPCServerSettings{NetAddr: confignet.NetAddr{
Endpoint: ":14250",
Transport: "tcp",
}},
}

// test
Expand Down
120 changes: 120 additions & 0 deletions extension/jaegerremotesampling/internal/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/jaegerremotesampling/internal"

import (
"context"
"errors"
"fmt"
"net"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
)

var _ component.Component = (*SamplingGRPCServer)(nil)

var errGRPCServerNotRunning = errors.New("gRPC server is not running")

type grpcServer interface {
Serve(lis net.Listener) error
GracefulStop()
Stop()
}

// NewGRPC returns a new sampling gRPC Server.
func NewGRPC(
telemetry component.TelemetrySettings,
settings configgrpc.GRPCServerSettings,
strategyStore strategystore.StrategyStore,
) (*SamplingGRPCServer, error) {
if strategyStore == nil {
return nil, errMissingStrategyStore
}

return &SamplingGRPCServer{
telemetry: telemetry,
settings: settings,
strategyStore: strategyStore,
}, nil
}

// SamplingGRPCServer implements component.Component to make the life cycle easy to manage.
type SamplingGRPCServer struct {
telemetry component.TelemetrySettings
settings configgrpc.GRPCServerSettings
strategyStore strategystore.StrategyStore

grpcServer grpcServer
}

func (s *SamplingGRPCServer) Start(_ context.Context, host component.Host) error {
opts, err := s.settings.ToServerOption(host, s.telemetry)
if err != nil {
return err
}
server := grpc.NewServer(opts...)
reflection.Register(server)
s.grpcServer = server

api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(s.strategyStore))

healthServer := health.NewServer()
healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(server, healthServer)

listener, err := s.settings.ToListener()
if err != nil {
return fmt.Errorf("failed to listen on gRPC port: %w", err)
}

go func() {
if err := s.grpcServer.Serve(listener); err != nil {
s.telemetry.Logger.Error("could not launch gRPC service", zap.Error(err))
}
}()

return nil
}

// Shutdown tries to terminate connections gracefully as long as the passed context is valid.
func (s *SamplingGRPCServer) Shutdown(ctx context.Context) error {
if s.grpcServer == nil {
return errGRPCServerNotRunning
}

ch := make(chan struct{})
go func() {
s.grpcServer.GracefulStop()
ch <- struct{}{}
}()

select {
case <-ctx.Done():
s.grpcServer.Stop()
case <-ch:
}

return nil
}
103 changes: 103 additions & 0 deletions extension/jaegerremotesampling/internal/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
)

func TestMissingClientConfigManagerGRPC(t *testing.T) {
s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), configgrpc.GRPCServerSettings{}, nil)
assert.Equal(t, errMissingStrategyStore, err)
assert.Nil(t, s)
}

func TestStartAndStopGRPC(t *testing.T) {
// prepare
srvSettings := configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: ":0",
Transport: "tcp",
},
}
s, err := NewGRPC(componenttest.NewNopTelemetrySettings(), srvSettings, &mockCfgMgr{})
require.NoError(t, err)
require.NotNil(t, s)

// test
assert.NoError(t, s.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, s.Shutdown(context.Background()))
}

func TestSamplingGRPCServer_Shutdown(t *testing.T) {
tt := []struct {
name string
grpcServer grpcServer
timeout time.Duration
expect error
}{
{
name: "graceful stop is successful without delay",
grpcServer: &grpcServerMock{},
timeout: time.Minute,
},
{
name: "graceful stop is successful with delay",
grpcServer: &grpcServerMock{
timeToGracefulStop: 5 * time.Second,
},
timeout: time.Minute,
},
{
name: "context timed out",
grpcServer: &grpcServerMock{
timeToGracefulStop: time.Minute,
},
timeout: 5 * time.Second,
},
{
name: "grpc server not started",
timeout: time.Minute,
expect: errGRPCServerNotRunning,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
srv := &SamplingGRPCServer{grpcServer: tc.grpcServer}
ctx, cancel := context.WithTimeout(context.Background(), tc.timeout)
defer cancel()
assert.Equal(t, tc.expect, srv.Shutdown(ctx))
})
}
}

type grpcServerMock struct {
timeToGracefulStop time.Duration
}

func (g *grpcServerMock) Serve(lis net.Listener) error { return nil }
func (g *grpcServerMock) Stop() {}
func (g *grpcServerMock) GracefulStop() { time.Sleep(g.timeToGracefulStop) }
18 changes: 2 additions & 16 deletions extension/jaegerremotesampling/internal/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
)

func TestMissingClientConfigManager(t *testing.T) {
// test
func TestMissingClientConfigManagerHTTP(t *testing.T) {
s, err := NewHTTP(componenttest.NewNopTelemetrySettings(), confighttp.HTTPServerSettings{}, nil)

// verify
assert.Equal(t, errMissingStrategyStore, err)
assert.Nil(t, s)
}

func TestStartAndStop(t *testing.T) {
func TestStartAndStopHTTP(t *testing.T) {
// prepare
srvSettings := confighttp.HTTPServerSettings{
Endpoint: ":0",
Expand Down Expand Up @@ -139,14 +136,3 @@ func TestErrorFromClientConfigManager(t *testing.T) {
body, _ := io.ReadAll(rw.Body)
assert.Contains(t, string(body), "failed to get sampling strategy for service")
}

type mockCfgMgr struct {
getSamplingStrategyFunc func(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
}

func (m *mockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
if m.getSamplingStrategyFunc != nil {
return m.getSamplingStrategyFunc(ctx, serviceName)
}
return sampling.NewSamplingStrategyResponse(), nil
}
32 changes: 32 additions & 0 deletions extension/jaegerremotesampling/internal/internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"

"github.com/jaegertracing/jaeger/thrift-gen/sampling"
)

type mockCfgMgr struct {
getSamplingStrategyFunc func(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error)
}

func (m *mockCfgMgr) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
if m.getSamplingStrategyFunc != nil {
return m.getSamplingStrategyFunc(ctx, serviceName)
}
return sampling.NewSamplingStrategyResponse(), nil
}
Loading

0 comments on commit 55cd915

Please sign in to comment.