Skip to content

Commit

Permalink
Remove service name as a parameter of Sarama instrumentation (#221)
Browse files Browse the repository at this point in the history
* Remove service name as a parameter of Sarama instrumentation

* Replace `WithTracer` with `WithTracerProvider`

* Update CHANGELOG

* Fix CHANGELOG & comments

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
XSAM and MrAlias authored Aug 17, 2020
1 parent 1df6921 commit 0d5bd34
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 97 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed

- Switch to use common top-level module `SemVersion()` when creating versioned tracer in `bradfitz/gomemcache`.
- Remove service name as a parameter of Sarama instrumentation. (#221)
- Replace `WithTracer` with `WithTracerProvider` in Sarama instrumentation. (#221)
- Switch to use common top-level module `SemVersion()` when creating versioned tracer in `bradfitz/gomemcache`. (#226)

### Fixed

- Update dependabot configuration to include newly added `bradfitz/gomemcache` package.
- Update dependabot configuration to include newly added `bradfitz/gomemcache` package. (#226)

## [0.10.1] - 2020-08-13

Expand Down
16 changes: 7 additions & 9 deletions instrumentation/github.com/Shopify/sarama/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {

// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)
func WrapPartitionConsumer(pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(opts...)

dispatcher := newConsumerMessagesDispatcherWrapper(pc, cfg)
go dispatcher.Run()
Expand All @@ -46,8 +46,7 @@ func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts
type consumer struct {
sarama.Consumer

serviceName string
opts []Option
opts []Option
}

// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
Expand All @@ -57,15 +56,14 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
return WrapPartitionConsumer(pc, c.opts...), nil
}

// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
func WrapConsumer(c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
Consumer: c,
opts: opts,
}
}
4 changes: 2 additions & 2 deletions instrumentation/github.com/Shopify/sarama/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,

// WrapConsumerGroupHandler wraps a sarama.ConsumerGroupHandler causing each received
// message to be traced.
func WrapConsumerGroupHandler(serviceName string, handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := newConfig(serviceName, opts...)
func WrapConsumerGroupHandler(handler sarama.ConsumerGroupHandler, opts ...Option) sarama.ConsumerGroupHandler {
cfg := newConfig(opts...)

return &consumerGroupHandler{
ConsumerGroupHandler: handler,
Expand Down
25 changes: 11 additions & 14 deletions instrumentation/github.com/Shopify/sarama/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@ import (
)

const (
serviceName = "test-service-name"
topic = "test-topic"
topic = "test-topic"
)

var (
propagators = global.Propagators()
)

func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, mt := newProviderAndTracer()

// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
Expand All @@ -54,21 +53,21 @@ func TestWrapPartitionConsumer(t *testing.T) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider))

consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}

func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, mt := newProviderAndTracer()

// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)

// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))
consumer := WrapConsumer(mockConsumer, WithTraceProvider(provider))

// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
Expand Down Expand Up @@ -107,7 +106,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
Expand All @@ -121,7 +119,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
Expand Down Expand Up @@ -159,7 +156,7 @@ func TestConsumerConsumePartitionWithError(t *testing.T) {
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)

consumer := WrapConsumer(serviceName, mockConsumer)
consumer := WrapConsumer(mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
Expand All @@ -168,12 +165,12 @@ func TestConsumerConsumePartitionWithError(t *testing.T) {
}

func BenchmarkWrapPartitionConsumer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock provider
provider, _ := newProviderAndTracer()

mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)

partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
partitionConsumer = WrapPartitionConsumer(partitionConsumer, WithTraceProvider(provider))
message := sarama.ConsumerMessage{Key: []byte("foo")}

b.ReportAllocs()
Expand Down
1 change: 0 additions & 1 deletion instrumentation/github.com/Shopify/sarama/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (w *consumerMessagesDispatcherWrapper) Run() {

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(w.cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
func startConsumerGroup(brokerList []string) {
consumerGroupHandler := Consumer{}
// Wrap instrumentation
handler := saramatrace.WrapConsumerGroupHandler("example-consumer", &consumerGroupHandler)
handler := saramatrace.WrapConsumerGroupHandler(&consumerGroupHandler)

config := sarama.NewConfig()
config.Version = sarama.V2_5_0_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
}

// Wrap instrumentation
producer = saramatrace.WrapAsyncProducer("example-producer", config, producer)
producer = saramatrace.WrapAsyncProducer(config, producer)

// We will log to STDOUT if we're not able to produce messages.
go func() {
Expand Down
30 changes: 16 additions & 14 deletions instrumentation/github.com/Shopify/sarama/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,35 @@ const (
)

type config struct {
ServiceName string
Tracer trace.Tracer
Propagators otelpropagation.Propagators
TraceProvider trace.Provider
Propagators otelpropagation.Propagators

Tracer trace.Tracer
}

// newConfig returns a config with all Options set.
func newConfig(serviceName string, opts ...Option) config {
cfg := config{Propagators: global.Propagators(), ServiceName: serviceName}
func newConfig(opts ...Option) config {
cfg := config{
Propagators: global.Propagators(),
TraceProvider: global.TraceProvider(),
}
for _, opt := range opts {
opt(&cfg)
}
if cfg.Tracer == nil {
cfg.Tracer = global.Tracer(defaultTracerName)
}

cfg.Tracer = cfg.TraceProvider.Tracer(defaultTracerName)

return cfg
}

// Option specifies instrumentation configuration options.
type Option func(*config)

// WithTracer specifies a tracer to use for creating spans. If none is
// specified, a tracer named
// "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
// from the global provider is used.
func WithTracer(tracer trace.Tracer) Option {
// WithTraceProvider specifies a trace provider to use for creating a tracer for spans.
// If none is specified, the global provider is used.
func WithTraceProvider(provider trace.Provider) Option {
return func(cfg *config) {
cfg.Tracer = tracer
cfg.TraceProvider = provider
}
}

Expand Down
38 changes: 13 additions & 25 deletions instrumentation/github.com/Shopify/sarama/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,37 @@ import (

func TestNewConfig(t *testing.T) {
testCases := []struct {
name string
serviceName string
opts []Option
expected config
name string
opts []Option
expected config
}{
{
name: "set service name",
serviceName: serviceName,
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: global.Propagators(),
},
},
{
name: "with tracer",
serviceName: serviceName,
name: "with provider",
opts: []Option{
WithTracer(global.Tracer("new")),
WithTraceProvider(global.TraceProvider()),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer("new"),
Propagators: global.Propagators(),
TraceProvider: global.TraceProvider(),
Tracer: global.TraceProvider().Tracer(defaultTracerName),
Propagators: global.Propagators(),
},
},
{
name: "with propagators",
serviceName: serviceName,
name: "with propagators",
opts: []Option{
WithPropagators(nil),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: nil,
TraceProvider: global.TraceProvider(),
Tracer: global.TraceProvider().Tracer(defaultTracerName),
Propagators: nil,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := newConfig(tc.serviceName, tc.opts...)
result := newConfig(tc.opts...)
assert.Equal(t, tc.expected, result)
})
}
Expand Down
9 changes: 4 additions & 5 deletions instrumentation/github.com/Shopify/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {

// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages
// are traced.
func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(serviceName, opts...)
func WrapSyncProducer(saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
Expand Down Expand Up @@ -131,8 +131,8 @@ type producerMessageContext struct {
//
// If `Return.Successes` is false, there is no way to know partition and offset of
// the message.
func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer {
cfg := newConfig(serviceName, opts...)
func WrapAsyncProducer(saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer {
cfg := newConfig(opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
Expand Down Expand Up @@ -234,7 +234,6 @@ func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.Prod

// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
Expand Down
Loading

0 comments on commit 0d5bd34

Please sign in to comment.