From 0d5bd34c447991225d7ab6eaf59a4570bb1163d0 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Mon, 17 Aug 2020 11:05:49 +0800 Subject: [PATCH] Remove service name as a parameter of Sarama instrumentation (#221) * Remove service name as a parameter of Sarama instrumentation * Replace `WithTracer` with `WithTracerProvider` * Update CHANGELOG * Fix CHANGELOG & comments Co-authored-by: Tyler Yahn --- CHANGELOG.md | 6 ++- .../github.com/Shopify/sarama/consumer.go | 16 +++--- .../Shopify/sarama/consumer_group.go | 4 +- .../Shopify/sarama/consumer_test.go | 25 +++++----- .../github.com/Shopify/sarama/dispatcher.go | 1 - .../sarama/example/consumer/consumer.go | 2 +- .../sarama/example/producer/producer.go | 2 +- .../github.com/Shopify/sarama/option.go | 30 ++++++------ .../github.com/Shopify/sarama/option_test.go | 38 +++++--------- .../github.com/Shopify/sarama/producer.go | 9 ++-- .../Shopify/sarama/producer_test.go | 49 ++++++++++--------- 11 files changed, 85 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2cc4e46aa8..f3263276fc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,11 +14,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed -- Switch to use common top-level module `SemVersion()` when creating versioned tracer in `bradfitz/gomemcache`. +- Remove service name as a parameter of Sarama instrumentation. (#221) +- Replace `WithTracer` with `WithTracerProvider` in Sarama instrumentation. (#221) +- Switch to use common top-level module `SemVersion()` when creating versioned tracer in `bradfitz/gomemcache`. (#226) ### Fixed -- Update dependabot configuration to include newly added `bradfitz/gomemcache` package. +- Update dependabot configuration to include newly added `bradfitz/gomemcache` package. (#226) ## [0.10.1] - 2020-08-13 diff --git a/instrumentation/github.com/Shopify/sarama/consumer.go b/instrumentation/github.com/Shopify/sarama/consumer.go index 5be0c5dff41..ce386929ca2 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer.go +++ b/instrumentation/github.com/Shopify/sarama/consumer.go @@ -31,8 +31,8 @@ func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage { // WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received // message to be traced. -func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { - cfg := newConfig(serviceName, opts...) +func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer { + cfg := newConfig(opts...) dispatcher := newConsumerMessagesDispatcherWrapper(pc, cfg) go dispatcher.Run() @@ -46,8 +46,7 @@ func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts type consumer struct { sarama.Consumer - serviceName string - opts []Option + opts []Option } // ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting @@ -57,15 +56,14 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) if err != nil { return nil, err } - return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil + return WrapPartitionConsumer(pc, c.opts...), nil } // WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created // via Consumer.ConsumePartition. -func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer { +func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer { return &consumer{ - Consumer: c, - serviceName: serviceName, - opts: opts, + Consumer: c, + opts: opts, } } diff --git a/instrumentation/github.com/Shopify/sarama/consumer_group.go b/instrumentation/github.com/Shopify/sarama/consumer_group.go index 3f125eb5a02..2f429470baa 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer_group.go +++ b/instrumentation/github.com/Shopify/sarama/consumer_group.go @@ -40,8 +40,8 @@ func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, // WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received // message to be traced. -func WrapConsumerGroupHandler(serviceName string, handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler { - cfg := newConfig(serviceName, opts...) +func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler { + cfg := newConfig(opts...) return &consumerGroupHandler{ ConsumerGroupHandler: handler, diff --git a/instrumentation/github.com/Shopify/sarama/consumer_test.go b/instrumentation/github.com/Shopify/sarama/consumer_test.go index 78d9fba8d3b..c45a6ce3767 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer_test.go +++ b/instrumentation/github.com/Shopify/sarama/consumer_test.go @@ -34,8 +34,7 @@ import ( ) const ( - serviceName = "test-service-name" - topic = "test-topic" + topic = "test-topic" ) var ( @@ -43,8 +42,8 @@ var ( ) func TestWrapPartitionConsumer(t *testing.T) { - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() // Mock partition consumer controller consumer := mocks.NewConsumer(t, sarama.NewConfig()) @@ -54,21 +53,21 @@ func TestWrapPartitionConsumer(t *testing.T) { partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) require.NoError(t, err) - partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt)) + partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider)) consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer) } func TestWrapConsumer(t *testing.T) { - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() // Mock partition consumer controller mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0) // Wrap consumer - consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt)) + consumer := WrapConsumer(mockConsumer, WithTraceProvider(provider)) // Create partition consumer partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0) @@ -107,7 +106,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer }{ { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String("test-topic"), @@ -121,7 +119,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String("test-topic"), @@ -159,7 +156,7 @@ func TestConsumerConsumePartitionWithError(t *testing.T) { mockConsumer := mocks.NewConsumer(t, sarama.NewConfig()) mockConsumer.ExpectConsumePartition(topic, 0, 0) - consumer := WrapConsumer(serviceName, mockConsumer) + consumer := WrapConsumer(mockConsumer) _, err := consumer.ConsumePartition(topic, 0, 0) assert.NoError(t, err) // Consume twice @@ -168,12 +165,12 @@ func TestConsumerConsumePartitionWithError(t *testing.T) { } func BenchmarkWrapPartitionConsumer(b *testing.B) { - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, _ := newProviderAndTracer() mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b) - partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt)) + partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider)) message := sarama.ConsumerMessage{Key: []byte("foo")} b.ReportAllocs() diff --git a/instrumentation/github.com/Shopify/sarama/dispatcher.go b/instrumentation/github.com/Shopify/sarama/dispatcher.go index 8f7854508c5..e613ecb605a 100644 --- a/instrumentation/github.com/Shopify/sarama/dispatcher.go +++ b/instrumentation/github.com/Shopify/sarama/dispatcher.go @@ -61,7 +61,6 @@ func (w *consumerMessagesDispatcherWrapper) Run() { // Create a span. attrs := []kv.KeyValue{ - standard.ServiceNameKey.String(w.cfg.ServiceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(msg.Topic), diff --git a/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go b/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go index f84cbb19bfb..9f512a92308 100644 --- a/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go +++ b/instrumentation/github.com/Shopify/sarama/example/consumer/consumer.go @@ -57,7 +57,7 @@ func main() { func startConsumerGroup(brokerList []string) { consumerGroupHandler := Consumer{} // Wrap instrumentation - handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler) + handler := saramatrace.WrapConsumerGroupHandler(&consumerGroupHandler) config := sarama.NewConfig() config.Version = sarama.V2_5_0_0 diff --git a/instrumentation/github.com/Shopify/sarama/example/producer/producer.go b/instrumentation/github.com/Shopify/sarama/example/producer/producer.go index 656572797f7..17544b8611d 100644 --- a/instrumentation/github.com/Shopify/sarama/example/producer/producer.go +++ b/instrumentation/github.com/Shopify/sarama/example/producer/producer.go @@ -90,7 +90,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer { } // Wrap instrumentation - producer = saramatrace.WrapAsyncProducer("example-producer", config, producer) + producer = saramatrace.WrapAsyncProducer(config, producer) // We will log to STDOUT if we're not able to produce messages. go func() { diff --git a/instrumentation/github.com/Shopify/sarama/option.go b/instrumentation/github.com/Shopify/sarama/option.go index 14c9811f638..f45b7779d52 100644 --- a/instrumentation/github.com/Shopify/sarama/option.go +++ b/instrumentation/github.com/Shopify/sarama/option.go @@ -28,33 +28,35 @@ const ( ) type config struct { - ServiceName string - Tracer trace.Tracer - Propagators otelpropagation.Propagators + TraceProvider trace.Provider + Propagators otelpropagation.Propagators + + Tracer trace.Tracer } // newConfig returns a config with all Options set. -func newConfig(serviceName string, opts ...Option) config { - cfg := config{Propagators: global.Propagators(), ServiceName: serviceName} +func newConfig(opts ...Option) config { + cfg := config{ + Propagators: global.Propagators(), + TraceProvider: global.TraceProvider(), + } for _, opt := range opts { opt(&cfg) } - if cfg.Tracer == nil { - cfg.Tracer = global.Tracer(defaultTracerName) - } + + cfg.Tracer = cfg.TraceProvider.Tracer(defaultTracerName) + return cfg } // Option specifies instrumentation configuration options. type Option func(*config) -// WithTracer specifies a tracer to use for creating spans. If none is -// specified, a tracer named -// "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama" -// from the global provider is used. -func WithTracer(tracer trace.Tracer) Option { +// WithTraceProvider specifies a trace provider to use for creating a tracer for spans. +// If none is specified, the global provider is used. +func WithTraceProvider(provider trace.Provider) Option { return func(cfg *config) { - cfg.Tracer = tracer + cfg.TraceProvider = provider } } diff --git a/instrumentation/github.com/Shopify/sarama/option_test.go b/instrumentation/github.com/Shopify/sarama/option_test.go index fa17279ed0c..d66532abf30 100644 --- a/instrumentation/github.com/Shopify/sarama/option_test.go +++ b/instrumentation/github.com/Shopify/sarama/option_test.go @@ -24,49 +24,37 @@ import ( func TestNewConfig(t *testing.T) { testCases := []struct { - name string - serviceName string - opts []Option - expected config + name string + opts []Option + expected config }{ { - name: "set service name", - serviceName: serviceName, - expected: config{ - ServiceName: serviceName, - Tracer: global.Tracer(defaultTracerName), - Propagators: global.Propagators(), - }, - }, - { - name: "with tracer", - serviceName: serviceName, + name: "with provider", opts: []Option{ - WithTracer(global.Tracer("new")), + WithTraceProvider(global.TraceProvider()), }, expected: config{ - ServiceName: serviceName, - Tracer: global.Tracer("new"), - Propagators: global.Propagators(), + TraceProvider: global.TraceProvider(), + Tracer: global.TraceProvider().Tracer(defaultTracerName), + Propagators: global.Propagators(), }, }, { - name: "with propagators", - serviceName: serviceName, + name: "with propagators", opts: []Option{ WithPropagators(nil), }, expected: config{ - ServiceName: serviceName, - Tracer: global.Tracer(defaultTracerName), - Propagators: nil, + TraceProvider: global.TraceProvider(), + Tracer: global.TraceProvider().Tracer(defaultTracerName), + Propagators: nil, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result := newConfig(tc.serviceName, tc.opts...) + result := newConfig(tc.opts...) assert.Equal(t, tc.expected, result) }) } diff --git a/instrumentation/github.com/Shopify/sarama/producer.go b/instrumentation/github.com/Shopify/sarama/producer.go index 0d68a979bc7..3a7b934318c 100644 --- a/instrumentation/github.com/Shopify/sarama/producer.go +++ b/instrumentation/github.com/Shopify/sarama/producer.go @@ -58,8 +58,8 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { // WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages // are traced. -func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { - cfg := newConfig(serviceName, opts...) +func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { + cfg := newConfig(opts...) if saramaConfig == nil { saramaConfig = sarama.NewConfig() } @@ -131,8 +131,8 @@ type producerMessageContext struct { // // If `Return.Successes` is false, there is no way to know partition and offset of // the message. -func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { - cfg := newConfig(serviceName, opts...) +func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer { + cfg := newConfig(opts...) if saramaConfig == nil { saramaConfig = sarama.NewConfig() } @@ -234,7 +234,6 @@ func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.Prod // Create a span. attrs := []kv.KeyValue{ - standard.ServiceNameKey.String(cfg.ServiceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(msg.Topic), diff --git a/instrumentation/github.com/Shopify/sarama/producer_test.go b/instrumentation/github.com/Shopify/sarama/producer_test.go index 3f1e7c994b4..75d63ace475 100644 --- a/instrumentation/github.com/Shopify/sarama/producer_test.go +++ b/instrumentation/github.com/Shopify/sarama/producer_test.go @@ -33,18 +33,25 @@ import ( mocktracer "go.opentelemetry.io/contrib/internal/trace" ) +func newProviderAndTracer() (*mocktracer.Provider, *mocktracer.Tracer) { + var provider mocktracer.Provider + tracer := provider.Tracer(defaultTracerName) + + return &provider, tracer.(*mocktracer.Tracer) +} + func TestWrapSyncProducer(t *testing.T) { var err error - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() cfg := newSaramaConfig() // Mock sync producer mockSyncProducer := mocks.NewSyncProducer(t, cfg) // Wrap sync producer - syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt)) + syncProducer := WrapSyncProducer(cfg, mockSyncProducer, WithTraceProvider(provider)) // Create message with span context ctx, _ := mt.Start(context.Background(), "") @@ -59,7 +66,6 @@ func TestWrapSyncProducer(t *testing.T) { }{ { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -71,7 +77,6 @@ func TestWrapSyncProducer(t *testing.T) { }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -82,7 +87,6 @@ func TestWrapSyncProducer(t *testing.T) { }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -95,7 +99,6 @@ func TestWrapSyncProducer(t *testing.T) { }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -158,10 +161,12 @@ func TestWrapAsyncProducer(t *testing.T) { } t.Run("without successes config", func(t *testing.T) { - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() + cfg := newSaramaConfig() mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) - ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + ap := WrapAsyncProducer(cfg, mockAsyncProducer, WithTraceProvider(provider)) msgList := createMessages(mt) // Send message @@ -183,7 +188,6 @@ func TestWrapAsyncProducer(t *testing.T) { }{ { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -195,7 +199,6 @@ func TestWrapAsyncProducer(t *testing.T) { }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -225,14 +228,15 @@ func TestWrapAsyncProducer(t *testing.T) { }) t.Run("with successes config", func(t *testing.T) { - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() // Set producer with successes config cfg := newSaramaConfig() cfg.Producer.Return.Successes = true mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) - ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + ap := WrapAsyncProducer(cfg, mockAsyncProducer, WithTraceProvider(provider)) msgList := createMessages(mt) // Send message @@ -258,7 +262,6 @@ func TestWrapAsyncProducer(t *testing.T) { }{ { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -270,7 +273,6 @@ func TestWrapAsyncProducer(t *testing.T) { }, { kvList: []kv.KeyValue{ - standard.ServiceNameKey.String(serviceName), standard.MessagingSystemKey.String("kafka"), standard.MessagingDestinationKindKeyTopic, standard.MessagingDestinationKey.String(topic), @@ -304,14 +306,15 @@ func TestWrapAsyncProducer(t *testing.T) { } func TestWrapAsyncProducerError(t *testing.T) { - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, mt := newProviderAndTracer() // Set producer with successes config cfg := newSaramaConfig() cfg.Producer.Return.Successes = true mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) - ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + ap := WrapAsyncProducer(cfg, mockAsyncProducer, WithTraceProvider(provider)) mockAsyncProducer.ExpectInputAndFail(errors.New("test")) ap.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo2")} @@ -337,15 +340,15 @@ func newSaramaConfig() *sarama.Config { } func BenchmarkWrapSyncProducer(b *testing.B) { - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, _ := newProviderAndTracer() cfg := newSaramaConfig() // Mock sync producer mockSyncProducer := mocks.NewSyncProducer(b, cfg) // Wrap sync producer - syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt)) + syncProducer := WrapSyncProducer(cfg, mockSyncProducer, WithTraceProvider(provider)) message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} b.ReportAllocs() @@ -378,15 +381,15 @@ func BenchmarkMockSyncProducer(b *testing.B) { } func BenchmarkWrapAsyncProducer(b *testing.B) { - // Mock tracer - mt := mocktracer.NewTracer("kafka") + // Mock provider + provider, _ := newProviderAndTracer() cfg := newSaramaConfig() cfg.Producer.Return.Successes = true mockAsyncProducer := mocks.NewAsyncProducer(b, cfg) // Wrap sync producer - asyncProducer := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) + asyncProducer := WrapAsyncProducer(cfg, mockAsyncProducer, WithTraceProvider(provider)) message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")} b.ReportAllocs()