-
Notifications
You must be signed in to change notification settings - Fork 170
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(active_job): Use ActiveSupport instead of patches
- Loading branch information
1 parent
d89b68e
commit 9599f18
Showing
8 changed files
with
245 additions
and
204 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
96 changes: 0 additions & 96 deletions
96
...n/active_job/lib/opentelemetry/instrumentation/active_job/patches/active_job_callbacks.rb
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
175 changes: 175 additions & 0 deletions
175
instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/subscriber.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# frozen_string_literal: true | ||
|
||
require 'active_support/subscriber' | ||
|
||
module OpenTelemetry | ||
module Instrumentation | ||
module ActiveJob | ||
# Provides helper methods | ||
# | ||
module AttributeProcessor | ||
def to_otel_semconv_attributes(job) | ||
test_adapters = %w[async inline] | ||
|
||
otel_attributes = { | ||
'code.namespace' => job.class.name, | ||
'messaging.destination_kind' => 'queue', | ||
'messaging.system' => job.class.queue_adapter_name, | ||
'messaging.destination' => job.queue_name, | ||
'messaging.message_id' => job.job_id, | ||
'rails.active_job.execution.counter' => job.executions + 1, | ||
'rails.active_job.provider_job_id' => job.provider_job_id, | ||
'rails.active_job.priority' => job.priority | ||
} | ||
|
||
otel_attributes['net.transport'] = 'inproc' if test_adapters.include?(job.class.queue_adapter_name) | ||
otel_attributes.compact! | ||
|
||
otel_attributes | ||
end | ||
end | ||
|
||
# Default handler to creates internal spans for events | ||
class DefaultHandler | ||
include AttributeProcessor | ||
|
||
def initialize(tracer) | ||
@tracer = tracer | ||
end | ||
|
||
def on_start(name, _id, payload) | ||
span = @tracer.start_span(name, attributes: to_otel_semconv_attributes(payload.fetch(:job))) | ||
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] | ||
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire | ||
{ span: span, ctx_tokens: tokens } | ||
end | ||
end | ||
|
||
# Handles enqueue.active_job | ||
class EnqueueHandler | ||
include AttributeProcessor | ||
|
||
def initialize(tracer) | ||
@tracer = tracer | ||
end | ||
|
||
def on_start(name, _id, payload) | ||
span = @tracer.start_span("#{payload.fetch(:job).queue_name} publish", kind: :producer, attributes: to_otel_semconv_attributes(payload.fetch(:job))) | ||
tokens = [OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))] | ||
OpenTelemetry.propagation.inject(payload.fetch(:job).__otel_headers) # This must be transmitted over the wire | ||
{ span: span, ctx_tokens: tokens } | ||
end | ||
end | ||
|
||
# Handles perform.active_job | ||
class PerformHandler | ||
include AttributeProcessor | ||
|
||
def initialize(tracer) | ||
@tracer = tracer | ||
end | ||
|
||
def on_start(name, _id, payload) | ||
tokens = [] | ||
parent_context = OpenTelemetry.propagation.extract(payload.fetch(:job).__otel_headers) | ||
span_context = OpenTelemetry::Trace.current_span(parent_context).context | ||
|
||
propagation_style = ActiveJob::Instrumentation.instance.config[:propagation_style] | ||
if propagation_style == :child | ||
tokens << OpenTelemetry::Context.attach(parent_context) | ||
span = @tracer.start_span( | ||
"#{payload.fetch(:job).queue_name} process", | ||
kind: :consumer, | ||
attributes: to_otel_semconv_attributes(payload.fetch(:job)) | ||
) | ||
else | ||
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link | ||
span = @tracer.start_root_span( | ||
"#{payload.fetch(:job).queue_name} process", | ||
kind: :consumer, | ||
attributes: to_otel_semconv_attributes(payload.fetch(:job)), | ||
links: links | ||
) | ||
end | ||
|
||
tokens << OpenTelemetry::Context.attach( | ||
OpenTelemetry::Trace.context_with_span(span) | ||
) | ||
|
||
{ span: span, ctx_tokens: tokens } | ||
end | ||
end | ||
|
||
# Custom subscriber that handles ActiveJob notifications | ||
class Subscriber < ::ActiveSupport::Subscriber | ||
attr_reader :tracer | ||
|
||
def initialize(...) | ||
super | ||
tracer = Instrumentation.instance.tracer | ||
default_handler = DefaultHandler.new(tracer) | ||
@handlers_by_pattern = { | ||
'enqueue.active_job' => EnqueueHandler.new(tracer), | ||
'perform.active_job' => PerformHandler.new(tracer) | ||
} | ||
@handlers_by_pattern.default = default_handler | ||
end | ||
|
||
# The methods below are the events the Subscriber is interested in. | ||
def enqueue_at(...); end | ||
def enqueue(...); end | ||
def enqueue_retry(...); end | ||
def perform_start(...); end | ||
def perform(...); end | ||
def retry_stopped(...); end | ||
def discard(...); end | ||
|
||
def start(name, id, payload) | ||
begin | ||
payload.merge!(__otel: @handlers_by_pattern[name].on_start(name, id, payload)) # The payload is _not_ transmitted over the wire | ||
rescue StandardError => e | ||
OpenTelemetry.handle_error(exception: e) | ||
end | ||
|
||
super | ||
end | ||
|
||
def finish(_name, _id, payload) | ||
begin | ||
otel = payload.delete(:__otel) | ||
span = otel&.fetch(:span) | ||
tokens = otel&.fetch(:ctx_tokens) | ||
exception = payload[:error] | ||
if exception | ||
span&.record_exception(exception) | ||
span&.status = OpenTelemetry::Trace::Status.error | ||
end | ||
rescue StandardError => e | ||
OpenTelemetry.handle_error(exception: e) | ||
end | ||
|
||
super | ||
ensure | ||
begin | ||
span&.finish | ||
rescue StandardError => e | ||
OpenTelemetry.handle_error(exception: e) | ||
end | ||
tokens&.reverse&.each do |token| | ||
OpenTelemetry::Context.detach(token) | ||
rescue StandardError => e | ||
OpenTelemetry.handle_error(exception: e) | ||
end | ||
end | ||
|
||
def self.install | ||
attach_to :active_job | ||
end | ||
|
||
def self.uninstall | ||
detach_from :active_job | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.