diff --git a/lib/ddtrace/contrib/racecar/patcher.rb b/lib/ddtrace/contrib/racecar/patcher.rb index 5da2a48953..a26d128501 100644 --- a/lib/ddtrace/contrib/racecar/patcher.rb +++ b/lib/ddtrace/contrib/racecar/patcher.rb @@ -1,4 +1,5 @@ require 'ddtrace/ext/app_types' +require 'ddtrace/contrib/active_support/notifications/subscriber' module Datadog module Contrib @@ -6,19 +7,24 @@ module Racecar # Provides instrumentation for `racecar` through ActiveSupport instrumentation signals module Patcher include Base + include ActiveSupport::Notifications::Subscriber + NAME_MESSAGE = 'racecar.message'.freeze NAME_BATCH = 'racecar.batch'.freeze register_as :racecar option :tracer, default: Datadog.tracer option :service_name, default: 'racecar' + on_subscribe do + subscribe('process_message.racecar', self::NAME_MESSAGE, {}, configuration[:tracer], &method(:process)) + subscribe('process_batch.racecar', self::NAME_BATCH, {}, configuration[:tracer], &method(:process)) + end + class << self def patch return patched? if patched? || !compatible? - ::ActiveSupport::Notifications.subscribe('process_batch.racecar', self) - ::ActiveSupport::Notifications.subscribe('process_message.racecar', self) - + subscribe! configuration[:tracer].set_service_info( configuration[:service_name], 'racecar', @@ -33,28 +39,17 @@ def patched? @patched = false end - def start(event, _, payload) - ensure_clean_context! - - name = event[/message/] ? NAME_MESSAGE : NAME_BATCH - span = configuration[:tracer].trace(name) + def process(span, event, _, payload) span.service = configuration[:service_name] span.resource = payload[:consumer_class] + span.set_tag('kafka.topic', payload[:topic]) span.set_tag('kafka.consumer', payload[:consumer_class]) span.set_tag('kafka.partition', payload[:partition]) span.set_tag('kafka.offset', payload[:offset]) if payload.key?(:offset) span.set_tag('kafka.first_offset', payload[:first_offset]) if payload.key?(:first_offset) span.set_tag('kafka.message_count', payload[:message_count]) if payload.key?(:message_count) - end - - def finish(_, _, payload) - current_span = configuration[:tracer].call_context.current_span - - return unless current_span - - current_span.set_error(payload[:exception_object]) if payload[:exception_object] - current_span.finish + span.set_error(payload[:exception_object]) if payload[:exception_object] end private @@ -66,12 +61,6 @@ def configuration def compatible? defined?(::Racecar) && defined?(::ActiveSupport::Notifications) end - - def ensure_clean_context! - return unless configuration[:tracer].call_context.current_span - - configuration[:tracer].provider.context = Context.new - end end end end diff --git a/spec/ddtrace/contrib/racecar/patcher_spec.rb b/spec/ddtrace/contrib/racecar/patcher_spec.rb index bba4ca1c5f..7e219836f9 100644 --- a/spec/ddtrace/contrib/racecar/patcher_spec.rb +++ b/spec/ddtrace/contrib/racecar/patcher_spec.rb @@ -17,6 +17,14 @@ def all_spans Datadog.configure do |c| c.use :racecar, tracer: tracer end + + # Make sure to update the subscription tracer, + # so we aren't writing to a stale tracer. + if Datadog::Contrib::Racecar::Patcher.patched? + Datadog::Contrib::Racecar::Patcher.subscriptions.each do |subscription| + allow(subscription).to receive(:tracer).and_return(tracer) + end + end end describe 'for single message processing' do