Skip to content

Commit

Permalink
Racecar integration to use ActiveSupport::Notifications::Subscriber (#…
Browse files Browse the repository at this point in the history
…381)

Refactoring Racecar integration after #380
  • Loading branch information
delner authored and Emanuele Palazzetti committed Mar 26, 2018
1 parent 9afa4a3 commit 7828c76
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
35 changes: 12 additions & 23 deletions lib/ddtrace/contrib/racecar/patcher.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
require 'ddtrace/ext/app_types'
require 'ddtrace/contrib/active_support/notifications/subscriber'

module Datadog
module Contrib
module Racecar
# Provides instrumentation for `racecar` through ActiveSupport instrumentation signals
module Patcher
include Base
include ActiveSupport::Notifications::Subscriber

NAME_MESSAGE = 'racecar.message'.freeze
NAME_BATCH = 'racecar.batch'.freeze
register_as :racecar
option :tracer, default: Datadog.tracer
option :service_name, default: 'racecar'

on_subscribe do
subscribe('process_message.racecar', self::NAME_MESSAGE, {}, configuration[:tracer], &method(:process))
subscribe('process_batch.racecar', self::NAME_BATCH, {}, configuration[:tracer], &method(:process))
end

class << self
def patch
return patched? if patched? || !compatible?

::ActiveSupport::Notifications.subscribe('process_batch.racecar', self)
::ActiveSupport::Notifications.subscribe('process_message.racecar', self)

subscribe!
configuration[:tracer].set_service_info(
configuration[:service_name],
'racecar',
Expand All @@ -33,28 +39,17 @@ def patched?
@patched = false
end

def start(event, _, payload)
ensure_clean_context!

name = event[/message/] ? NAME_MESSAGE : NAME_BATCH
span = configuration[:tracer].trace(name)
def process(span, event, _, payload)
span.service = configuration[:service_name]
span.resource = payload[:consumer_class]

span.set_tag('kafka.topic', payload[:topic])
span.set_tag('kafka.consumer', payload[:consumer_class])
span.set_tag('kafka.partition', payload[:partition])
span.set_tag('kafka.offset', payload[:offset]) if payload.key?(:offset)
span.set_tag('kafka.first_offset', payload[:first_offset]) if payload.key?(:first_offset)
span.set_tag('kafka.message_count', payload[:message_count]) if payload.key?(:message_count)
end

def finish(_, _, payload)
current_span = configuration[:tracer].call_context.current_span

return unless current_span

current_span.set_error(payload[:exception_object]) if payload[:exception_object]
current_span.finish
span.set_error(payload[:exception_object]) if payload[:exception_object]
end

private
Expand All @@ -66,12 +61,6 @@ def configuration
def compatible?
defined?(::Racecar) && defined?(::ActiveSupport::Notifications)
end

def ensure_clean_context!
return unless configuration[:tracer].call_context.current_span

configuration[:tracer].provider.context = Context.new
end
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/ddtrace/contrib/racecar/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ def all_spans
Datadog.configure do |c|
c.use :racecar, tracer: tracer
end

# Make sure to update the subscription tracer,
# so we aren't writing to a stale tracer.
if Datadog::Contrib::Racecar::Patcher.patched?
Datadog::Contrib::Racecar::Patcher.subscriptions.each do |subscription|
allow(subscription).to receive(:tracer).and_return(tracer)
end
end
end

describe 'for single message processing' do
Expand Down

0 comments on commit 7828c76

Please sign in to comment.