Skip to content

Commit

Permalink
[receiver/jaegerreceiver] Remove jaeger remote sampling from receiver (
Browse files Browse the repository at this point in the history
…#14163)

Signed-off-by: Benedikt Bongartz <bongartz@klimlive.de>
  • Loading branch information
frzifus authored Sep 16, 2022
1 parent 8b2ea48 commit 685c8b8
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 347 deletions.
42 changes: 4 additions & 38 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package jaegerreceiver // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.uber.org/zap"
)

const (
Expand All @@ -40,11 +38,10 @@ const (
protoThriftCompact = "thrift_compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
defaultAgentRemoteSamplingHTTPEndpoint = "0.0.0.0:5778"
defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
)

// NewFactory creates a new Jaeger receiver factory.
Expand Down Expand Up @@ -94,7 +91,6 @@ func createTracesReceiver(
// Error handling for the conversion is done in the Validate function from the Config object itself.

rCfg := cfg.(*Config)
remoteSamplingConfig := rCfg.RemoteSampling

var config configuration
// Set ports
Expand All @@ -114,36 +110,6 @@ func createTracesReceiver(
config.AgentCompactThrift = *rCfg.ThriftCompact
}

if remoteSamplingConfig != nil {
logSamplingDeprecation(set.Logger)

config.RemoteSamplingClientSettings = remoteSamplingConfig.GRPCClientSettings
if config.RemoteSamplingClientSettings.Endpoint == "" {
config.RemoteSamplingClientSettings.Endpoint = defaultGRPCBindEndpoint
}

config.AgentHTTPEndpoint = remoteSamplingConfig.HostEndpoint
if config.AgentHTTPEndpoint == "" {
config.AgentHTTPEndpoint = defaultAgentRemoteSamplingHTTPEndpoint
}

// strategies are served over grpc so if grpc is not enabled and strategies are present return an error
if len(remoteSamplingConfig.StrategyFile) != 0 {
config.RemoteSamplingStrategyFile = remoteSamplingConfig.StrategyFile
config.RemoteSamplingStrategyFileReloadInterval = remoteSamplingConfig.StrategyFileReloadInterval
}
}

// Create the receiver.
return newJaegerReceiver(rCfg.ID(), &config, nextConsumer, set), nil
}

var once sync.Once

func logSamplingDeprecation(logger *zap.Logger) {
once.Do(func() {
logger.Warn(
"Jaeger remote sampling support is deprecated and will be removed in release v0.61.0. Use the jaegerremotesampling extension instead.",
)
})
}
69 changes: 0 additions & 69 deletions receiver/jaegerreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,72 +187,3 @@ func TestCreateInvalidThriftCompactEndpoint(t *testing.T) {
assert.NoError(t, err, "unexpected error creating receiver")
assert.Equal(t, defaultThriftCompactBindEndpoint, r.(*jReceiver).config.AgentCompactThrift.Endpoint, "thrift port should be default")
}

func TestDefaultAgentRemoteSamplingEndpointAndPort(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

rCfg.Protocols.ThriftCompact = &ProtocolUDP{
Endpoint: defaultThriftCompactBindEndpoint,
}
rCfg.RemoteSampling = &RemoteSamplingConfig{}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, defaultGRPCBindEndpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
}

func TestAgentRemoteSamplingEndpoint(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

endpoint := "localhost:1234"
rCfg.Protocols.ThriftCompact = &ProtocolUDP{
Endpoint: defaultThriftCompactBindEndpoint,
}
rCfg.RemoteSampling = &RemoteSamplingConfig{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: endpoint,
},
}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, defaultAgentRemoteSamplingHTTPEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be default")
}

func TestRemoteSamplingConfigPropagation(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
rCfg := cfg.(*Config)

hostEndpoint := "localhost:5778"
endpoint := "localhost:1234"
strategyFile := "strategies.json"
rCfg.Protocols.GRPC = &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: defaultGRPCBindEndpoint,
Transport: "tcp",
},
}
rCfg.RemoteSampling = &RemoteSamplingConfig{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: endpoint,
},
HostEndpoint: hostEndpoint,
StrategyFile: strategyFile,
}
set := componenttest.NewNopReceiverCreateSettings()
r, err := factory.CreateTracesReceiver(context.Background(), set, cfg, nil)

assert.NoError(t, err, "create trace receiver should not error")
assert.Equal(t, endpoint, r.(*jReceiver).config.RemoteSamplingClientSettings.Endpoint)
assert.Equal(t, hostEndpoint, r.(*jReceiver).config.AgentHTTPEndpoint, "agent http port should be configured value")
assert.Equal(t, strategyFile, r.(*jReceiver).config.RemoteSamplingStrategyFile)
}
2 changes: 1 addition & 1 deletion receiver/jaegerreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
go.opentelemetry.io/collector/pdata v0.60.0
go.opentelemetry.io/collector/semconv v0.60.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.23.0
google.golang.org/grpc v1.49.0
)

Expand Down Expand Up @@ -72,6 +71,7 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/trace v1.9.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
26 changes: 4 additions & 22 deletions receiver/jaegerreceiver/jaeger_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -140,20 +138,14 @@ func (*mockSamplingHandler) GetSamplingStrategy(context.Context, *api_v2.Samplin
}

func TestJaegerHTTP(t *testing.T) {
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
s, _ := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterSamplingManagerServer(s, &mockSamplingHandler{})
})
defer s.GracefulStop()

endpoint := testutil.GetAvailableLocalAddress(t)
config := &configuration{
AgentHTTPEndpoint: endpoint,
RemoteSamplingClientSettings: configgrpc.GRPCClientSettings{
Endpoint: addr.String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
},
}
set := componenttest.NewNopReceiverCreateSettings()
jr := newJaegerReceiver(jaegerAgent, config, nil, set)
Expand All @@ -174,20 +166,10 @@ func TestJaegerHTTP(t *testing.T) {
resp, err := http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

resp, err = http.Get(fmt.Sprintf("http://%s/sampling?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
}

resp, err = http.Get(fmt.Sprintf("http://%s/baggageRestrictions?service=test", endpoint))
assert.NoError(t, err, "should not have failed to make request")
if resp != nil {
assert.Equal(t, 200, resp.StatusCode, "should have returned 200")
assert.Equal(t, 500, resp.StatusCode, "should have returned 200")
return
}
t.Fail()
}

func testJaegerAgent(t *testing.T, agentEndpoint string, receiverConfig *configuration) {
Expand Down
77 changes: 19 additions & 58 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,17 @@ import (
"mime"
"net/http"
"sync"
"time"

apacheThrift "github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
"github.com/jaegertracing/jaeger/cmd/agent/app/configmanager"
jSamplingConfig "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc"
"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
collectorSampling "github.com/jaegertracing/jaeger/cmd/collector/app/sampling"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
staticStrategyStore "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/thrift-gen/agent"
"github.com/jaegertracing/jaeger/thrift-gen/baggage"
Expand All @@ -51,7 +47,6 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/grpc"

jaegertranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
Expand All @@ -63,12 +58,9 @@ type configuration struct {
CollectorHTTPSettings confighttp.HTTPServerSettings
CollectorGRPCServerSettings configgrpc.GRPCServerSettings

AgentCompactThrift ProtocolUDP
AgentBinaryThrift ProtocolUDP
AgentHTTPEndpoint string
RemoteSamplingClientSettings configgrpc.GRPCClientSettings
RemoteSamplingStrategyFile string
RemoteSamplingStrategyFileReloadInterval time.Duration
AgentCompactThrift ProtocolUDP
AgentBinaryThrift ProtocolUDP
AgentHTTPEndpoint string
}

// Receiver type is used to receive spans that were originally intended to be sent to Jaeger.
Expand All @@ -82,9 +74,8 @@ type jReceiver struct {
grpc *grpc.Server
collectorServer *http.Server

agentSamplingManager *jSamplingConfig.SamplingManager
agentProcessors []processors.Processor
agentServer *http.Server
agentProcessors []processors.Processor
agentServer *http.Server

goroutines sync.WaitGroup

Expand Down Expand Up @@ -183,7 +174,19 @@ func consumeTraces(ctx context.Context, batch *jaeger.Batch, consumer consumer.T

var _ agent.Agent = (*agentHandler)(nil)
var _ api_v2.CollectorServiceServer = (*jReceiver)(nil)
var _ configmanager.ClientConfigManager = (*jReceiver)(nil)
var _ configmanager.ClientConfigManager = (*notImplementedConfigManager)(nil)

var errNotImplemented = fmt.Errorf("not implemented")

type notImplementedConfigManager struct{}

func (notImplementedConfigManager) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errNotImplemented
}

func (notImplementedConfigManager) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
return nil, errNotImplemented
}

type agentHandler struct {
nextConsumer consumer.Traces
Expand All @@ -204,21 +207,6 @@ func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error
return err
}

func (jr *jReceiver) GetSamplingStrategy(ctx context.Context, serviceName string) (*sampling.SamplingStrategyResponse, error) {
return jr.agentSamplingManager.GetSamplingStrategy(ctx, serviceName)
}

func (jr *jReceiver) GetBaggageRestrictions(ctx context.Context, serviceName string) ([]*baggage.BaggageRestriction, error) {
br, err := jr.agentSamplingManager.GetBaggageRestrictions(ctx, serviceName)
if err != nil {
// Baggage restrictions are not yet implemented - refer to - https://github.com/jaegertracing/jaeger/issues/373
// As of today, GetBaggageRestrictions() always returns an error.
// However, we `return nil, nil` here in order to serve a valid `200 OK` response.
return nil, nil
}
return br, nil
}

func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
ctx = jr.grpcObsrecv.StartTracesOp(ctx)

Expand Down Expand Up @@ -283,25 +271,8 @@ func (jr *jReceiver) startAgent(host component.Host) error {
}(processor)
}

// Start upstream grpc client before serving sampling endpoints over HTTP
if jr.config.RemoteSamplingClientSettings.Endpoint != "" {
grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions(host, jr.settings.TelemetrySettings)
if err != nil {
jr.settings.Logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err))
return err
}
jr.config.RemoteSamplingClientSettings.SanitizedEndpoint()
conn, err := grpc.Dial(jr.config.RemoteSamplingClientSettings.Endpoint, grpcOpts...)
if err != nil {
jr.settings.Logger.Error("Error creating grpc connection to jaeger remote sampling endpoint", zap.String("endpoint", jr.config.RemoteSamplingClientSettings.Endpoint))
return err
}

jr.agentSamplingManager = jSamplingConfig.NewConfigManager(conn)
}

if jr.config.AgentHTTPEndpoint != "" {
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, jr, metrics.NullFactory, jr.settings.Logger)
jr.agentServer = httpserver.NewHTTPServer(jr.config.AgentHTTPEndpoint, &notImplementedConfigManager{}, metrics.NullFactory, jr.settings.Logger)

jr.goroutines.Add(1)
go func() {
Expand Down Expand Up @@ -434,16 +405,6 @@ func (jr *jReceiver) startCollector(host component.Host) error {

api_v2.RegisterCollectorServiceServer(jr.grpc, jr)

// init and register sampling strategy store
ss, err := staticStrategyStore.NewStrategyStore(staticStrategyStore.Options{
StrategiesFile: jr.config.RemoteSamplingStrategyFile,
ReloadInterval: jr.config.RemoteSamplingStrategyFileReloadInterval,
}, jr.settings.Logger)
if err != nil {
return fmt.Errorf("failed to create collector strategy store: %w", err)
}
api_v2.RegisterSamplingManagerServer(jr.grpc, collectorSampling.NewGRPCHandler(ss))

jr.goroutines.Add(1)
go func() {
defer jr.goroutines.Done()
Expand Down
Loading

0 comments on commit 685c8b8

Please sign in to comment.