Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(active-job): Propagate context between enqueue and perform #1132

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ def start_span(name, _id, payload)
job = payload.fetch(:job)
span_name = span_name(job, EVENT_NAME)
span = tracer.start_span(span_name, kind: :producer, attributes: @mapper.call(payload))
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))
OpenTelemetry.propagation.inject(job.__otel_headers) # This must be transmitted over the wire
{ span: span, ctx_token: OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) }
{ span: span, ctx_token: token }
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def start_span(name, _id, payload)
# TODO: Refactor into a propagation strategy
propagation_style = @config[:propagation_style]
if propagation_style == :child
span = tracer.start_span(span_name, kind: :consumer, attributes: @mapper.call(payload))
span = tracer.start_span(span_name, with_parent: parent_context, kind: :consumer, attributes: @mapper.call(payload))
else
span_context = OpenTelemetry::Trace.current_span(parent_context).context
links = [OpenTelemetry::Trace::Link.new(span_context)] if span_context.valid? && propagation_style == :link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'does not interfere with an outer span' do
instrumentation.tracer.in_span('outer span') do
TestJob.perform_later
end

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
Expand All @@ -190,6 +202,30 @@

_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
ActiveJob::Base.queue_adapter.shutdown
rescue StandardError
nil
end

singleton_class.include ActiveJob::TestHelper
ActiveJob::Base.queue_adapter = :test
end

it 'creates span links in separate traces' do
TestJob.perform_later
perform_enqueued_jobs

_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end
end

describe 'when configured to do parent/child spans' do
Expand All @@ -204,6 +240,17 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'does not interfere with an outer span' do
instrumentation.tracer.in_span('outer span') do
TestJob.perform_later
end

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end

it 'propagates baggage' do
ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked')
OpenTelemetry::Context.with_current(ctx) do
Expand All @@ -215,6 +262,29 @@
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.attributes['success']).must_equal(true)
end

describe 'with an async queue adapter' do
before do
begin
ActiveJob::Base.queue_adapter.shutdown
rescue StandardError
nil
end

singleton_class.include ActiveJob::TestHelper
ActiveJob::Base.queue_adapter = :test
end

it 'creates a parent/child relationship' do
TestJob.perform_later
perform_enqueued_jobs

_(process_span.total_recorded_links).must_equal(0)

_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
end
end
end

describe 'when explicitly configure for no propagation' do
Expand Down
Loading