Skip to content

Commit

Permalink
Merge pull request #3725 from DataDog/as-events
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Jun 26, 2024
2 parents 3e515da + 71efe31 commit a13cb0f
Show file tree
Hide file tree
Showing 37 changed files with 181 additions and 120 deletions.
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/action_cable/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def self.included(base)
module ClassMethods
include Contrib::ActionCable::Event::ClassMethods

def subscription(*args)
def subscription(*args, **kwargs)
super.tap do |subscription|
subscription.before_trace { ensure_clean_context! }
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def span_type
Tracing::Metadata::Ext::AppTypes::TYPE_WEB
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
channel = payload[:broadcasting] # Channel has high cardinality
span.service = configuration[:service_name] if configuration[:service_name]
span.type = span_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def span_type
Tracing::Metadata::Ext::AppTypes::TYPE_WEB
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
channel_class = payload[:channel_class]
action = payload[:action]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def span_type
Tracing::Metadata::Ext::AppTypes::TYPE_WEB
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
channel_class = payload[:channel_class]

span.service = configuration[:service_name] if configuration[:service_name]
Expand Down
10 changes: 4 additions & 6 deletions lib/datadog/tracing/contrib/action_mailer/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ def configuration
Datadog.configuration.tracing[:action_mailer]
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
super

span.type = span_type
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:mailer]
span.set_tag(Tracing::Metadata::Ext::TAG_COMPONENT, Ext::TAG_COMPONENT)

# Set analytics sample rate
Expand All @@ -39,10 +41,6 @@ def process(span, event, _id, payload)

# Measure service stats
Contrib::Analytics.set_measured(span)

report_if_exception(span, payload)
rescue StandardError => e
Datadog.logger.debug(e.message)
end
end
end
Expand Down
13 changes: 9 additions & 4 deletions lib/datadog/tracing/contrib/action_mailer/events/deliver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ def span_type
Tracing::Metadata::Ext::AppTypes::TYPE_WORKER
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
super

span.type = span_type
span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_DELIVER)
end

def on_finish(span, event, _id, payload)
super

span.resource = payload[:mailer] # Mailer is not available at `on_start`

span.set_tag(Ext::TAG_MAILER, payload[:mailer])
span.set_tag(Ext::TAG_MSG_ID, payload[:message_id])

span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_DELIVER)

# Since email data can contain PII we disable by default
# Some of these fields can be either strings or arrays, so we try to normalize
# https://github.com/rails/rails/blob/18707ab17fa492eb25ad2e8f9818a320dc20b823/actionmailer/lib/action_mailer/base.rb#L742-L754
Expand Down
5 changes: 3 additions & 2 deletions lib/datadog/tracing/contrib/action_mailer/events/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ def span_type
Tracing::Metadata::Ext::HTTP::TYPE_TEMPLATE
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
super

span.type = span_type
span.resource = payload[:mailer] # Mailer is not available at `on_start`

span.set_tag(Ext::TAG_ACTION, payload[:action])
span.set_tag(Ext::TAG_MAILER, payload[:mailer])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def span_name
Ext::SPAN_RENDER_PARTIAL
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
span.service = configuration[:service_name] if configuration[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_TEMPLATE

Expand All @@ -41,10 +41,6 @@ def process(span, _event, _id, payload)

# Measure service stats
Contrib::Analytics.set_measured(span)

record_exception(span, payload)
rescue StandardError => e
Datadog.logger.debug(e.message)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def span_name
Ext::SPAN_RENDER_TEMPLATE
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
span.service = configuration[:service_name] if configuration[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_TEMPLATE

Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/active_job/events/discard.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_DISCARD
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/active_job/events/enqueue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_ENQUEUE
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_ENQUEUE
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_ENQUEUE_RETRY
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/active_job/events/perform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_PERFORM
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def span_name
Ext::SPAN_RETRY_STOPPED
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.name = span_name
span.service = configuration[:service_name] if configuration[:service_name]
span.resource = payload[:job].class.name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def span_name
Ext::SPAN_RENDER
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_RENDER)

set_common_tags(span, payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def span_name
Ext::SPAN_SERIALIZE
end

def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_SERIALIZE)

set_common_tags(span, payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def span_name
Ext::SPAN_INSTANTIATION
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
span.resource = payload.fetch(:class_name)
span.type = Ext::SPAN_TYPE_INSTANTIATION
span.set_tag(Tracing::Metadata::Ext::TAG_COMPONENT, Ext::TAG_COMPONENT)
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/active_record/events/sql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def span_name
Ext::SPAN_SQL
end

def process(span, event, _id, payload)
def on_start(span, event, _id, payload)
config = Utils.connection_config(payload[:connection], payload[:connection_id])
settings = Datadog.configuration.tracing[:active_record, config]
adapter_name = Contrib::Utils::Database.normalize_vendor(config[:adapter])
Expand Down
29 changes: 23 additions & 6 deletions lib/datadog/tracing/contrib/active_support/notifications/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@ def subscribe! # rubocop:disable Lint/UselessMethodDefinition
super
end

def subscription(span_name = nil, options = nil)
def subscription(span_name = nil, span_options = nil, on_start: nil, on_finish: nil)
super(
span_name || self.span_name,
options || span_options,
&method(:process)
span_options || self.span_options,
on_start: on_start,
on_finish: on_finish
)
end

def subscribe(pattern = nil, span_name = nil, options = nil)
def subscribe(pattern = nil, span_name = nil, span_options = nil)
if supported?
super(
pattern || event_name,
span_name || self.span_name,
options || span_options,
&method(:process)
span_options || self.span_options,
on_start: method(:on_start),
on_finish: method(:on_finish)
)
end
end
Expand All @@ -62,6 +64,21 @@ def payload_exception(payload)
payload[:exception_object] ||
payload[:exception] # Fallback for ActiveSupport < 5.0
end

def on_start(_span, _event, _id, _payload); end

def on_finish(span, _event, _id, payload)
record_exception(span, payload)
end

def record_exception(span, payload)
if payload[:exception_object]
span.set_error(payload[:exception_object])
elsif payload[:exception]
# Fallback for ActiveSupport < 5.0
span.set_error(payload[:exception])
end
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ def subscribe!
end

# Creates a subscription and immediately activates it.
def subscribe(pattern, span_name, options = {}, &block)
subscription(span_name, options, &block).tap do |subscription|
def subscribe(pattern, span_name, span_options = {}, on_start: nil, on_finish: nil)
subscription(span_name, span_options, on_start: on_start, on_finish: on_finish).tap do |subscription|
subscription.subscribe(pattern)
end
end

# Creates a subscription without activating it.
# Subscription is added to the inheriting class' list of subscriptions.
def subscription(span_name, options = {}, &block)
Subscription.new(span_name, options, &block).tap do |subscription|
def subscription(span_name, span_options = {}, on_start: nil, on_finish: nil)
Subscription.new(span_name, span_options, on_start: on_start, on_finish: on_finish).tap do |subscription|
subscriptions << subscription
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@ module Notifications
class Subscription
attr_accessor \
:span_name,
:options
:span_options

def initialize(span_name, options, &block)
raise ArgumentError, 'Must be given a block!' unless block
# @param span_name [String] the operation name for the span
# @param span_options [Hash] span_options to pass during span creation
# @param on_start [Proc] a block to run when the event is fired,
# might not include all required information in the `payload` argument.
# @param on_finish [Proc] a block to run when the event has finished processing,
# possibly including more information in the `payload` argument.
def initialize(span_name, span_options, on_start: nil, on_finish: nil)
raise ArgumentError, 'Must be given either on_start or on_finish' unless on_start || on_finish

@span_name = span_name
@options = options
@handler = Handler.new(&block)
@span_options = span_options
@on_start = Handler.new(on_start)
@on_finish = Handler.new(on_finish)
@callbacks = Callbacks.new
end

Expand Down Expand Up @@ -69,19 +76,24 @@ def unsubscribe_all
protected

attr_reader \
:handler,
:on_start,
:on_finish,
:callbacks

def start_span(name, id, payload, start = nil)
# Run callbacks
callbacks.run(name, :before_trace, id, payload, start)

# Start a trace
Tracing.trace(@span_name, **@options).tap do |span|
# Start span if time is provided
span.start(start) unless start.nil?
payload[:datadog_span] = span
end
span = Tracing.trace(@span_name, **@span_options)

# Start span if time is provided
span.start(start) unless start.nil?
payload[:datadog_span] = span

on_start.run(span, name, id, payload)

span
end

def finish_span(name, id, payload, finish = nil)
Expand All @@ -90,7 +102,7 @@ def finish_span(name, id, payload, finish = nil)
return nil if span.nil?

# Run handler for event
handler.run(span, name, id, payload)
on_finish.run(span, name, id, payload)

# Finish the span
span.finish(finish)
Expand All @@ -109,21 +121,17 @@ def subscribers
class Handler
attr_reader :block

def initialize(&block)
def initialize(block)
@block = block
end

def run(span, name, id, payload)
run!(span, name, id, payload)
@block.call(span, name, id, payload) if @block
rescue StandardError => e
Datadog.logger.debug(
"ActiveSupport::Notifications handler for '#{name}' failed: #{e.class.name} #{e.message}"
)
end

def run!(*args)
@block.call(*args)
end
end

# Wrapper for subscription callbacks
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/kafka/consumer_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module Contrib
module Kafka
# Defines basic behaviors for an event for a consumer.
module ConsumerEvent
def process(span, _event, _id, payload)
def on_start(span, _event, _id, payload)
super

span.set_tag(Ext::TAG_GROUP, payload[:group_id])
Expand Down
Loading

0 comments on commit a13cb0f

Please sign in to comment.