Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ActiveSupport::Notifications::Subscriber with Racecar #381

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions lib/ddtrace/contrib/racecar/patcher.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
require 'ddtrace/ext/app_types'
require 'ddtrace/contrib/active_support/notifications/subscriber'

module Datadog
module Contrib
module Racecar
# Provides instrumentation for `racecar` through ActiveSupport instrumentation signals
module Patcher
include Base
include ActiveSupport::Notifications::Subscriber

NAME_MESSAGE = 'racecar.message'.freeze
NAME_BATCH = 'racecar.batch'.freeze
register_as :racecar
option :tracer, default: Datadog.tracer
option :service_name, default: 'racecar'

on_subscribe do
subscribe('process_message.racecar', self::NAME_MESSAGE, {}, configuration[:tracer], &method(:process))
subscribe('process_batch.racecar', self::NAME_BATCH, {}, configuration[:tracer], &method(:process))
end

class << self
def patch
return patched? if patched? || !compatible?

::ActiveSupport::Notifications.subscribe('process_batch.racecar', self)
::ActiveSupport::Notifications.subscribe('process_message.racecar', self)

subscribe!
configuration[:tracer].set_service_info(
configuration[:service_name],
'racecar',
Expand All @@ -33,28 +39,17 @@ def patched?
@patched = false
end

def start(event, _, payload)
ensure_clean_context!

name = event[/message/] ? NAME_MESSAGE : NAME_BATCH
span = configuration[:tracer].trace(name)
def process(span, event, _, payload)
span.service = configuration[:service_name]
span.resource = payload[:consumer_class]

span.set_tag('kafka.topic', payload[:topic])
span.set_tag('kafka.consumer', payload[:consumer_class])
span.set_tag('kafka.partition', payload[:partition])
span.set_tag('kafka.offset', payload[:offset]) if payload.key?(:offset)
span.set_tag('kafka.first_offset', payload[:first_offset]) if payload.key?(:first_offset)
span.set_tag('kafka.message_count', payload[:message_count]) if payload.key?(:message_count)
end

def finish(_, _, payload)
current_span = configuration[:tracer].call_context.current_span

return unless current_span

current_span.set_error(payload[:exception_object]) if payload[:exception_object]
current_span.finish
span.set_error(payload[:exception_object]) if payload[:exception_object]
end

private
Expand All @@ -66,12 +61,6 @@ def configuration
def compatible?
defined?(::Racecar) && defined?(::ActiveSupport::Notifications)
end

def ensure_clean_context!
return unless configuration[:tracer].call_context.current_span

configuration[:tracer].provider.context = Context.new
end
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/ddtrace/contrib/racecar/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ def all_spans
Datadog.configure do |c|
c.use :racecar, tracer: tracer
end

# Make sure to update the subscription tracer,
# so we aren't writing to a stale tracer.
if Datadog::Contrib::Racecar::Patcher.patched?
Datadog::Contrib::Racecar::Patcher.subscriptions.each do |subscription|
allow(subscription).to receive(:tracer).and_return(tracer)
end
end
end

describe 'for single message processing' do
Expand Down