From 72d8d17308b107fb690ac080851e12ffbaf6d15b Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Wed, 18 Sep 2024 15:34:52 -0700 Subject: [PATCH] wip --- Gemfile | 2 +- .../tracing/contrib/aws/instrumentation.rb | 35 +++- .../tracing/contrib/aws/service/base.rb | 14 +- .../tracing/contrib/aws/service/sns.rb | 9 +- .../tracing/contrib/aws/service/sqs.rb | 23 +-- lib/datadog/tracing/trace_digest.rb | 52 ++++++ .../contrib/aws/instrumentation_spec.rb | 12 +- .../contrib/aws/service/shared_examples.rb | 165 ++++++++++++++++++ .../tracing/contrib/aws/service/sns_spec.rb | 103 +---------- .../tracing/contrib/aws/service/sqs_spec.rb | 13 ++ spec/datadog/tracing/trace_digest_spec.rb | 51 ++++++ 11 files changed, 347 insertions(+), 132 deletions(-) create mode 100644 spec/datadog/tracing/contrib/aws/service/shared_examples.rb diff --git a/Gemfile b/Gemfile index 3621701aa3..8461f0ff66 100644 --- a/Gemfile +++ b/Gemfile @@ -101,4 +101,4 @@ end # TODO: Remove this once the issue is resolved: https://github.com/ffi/ffi/issues/1107 gem 'ffi', '~> 1.16.3', require: false -gem 'aws-sdk' +# gem 'aws-sdk' diff --git a/lib/datadog/tracing/contrib/aws/instrumentation.rb b/lib/datadog/tracing/contrib/aws/instrumentation.rb index b7fa905748..119e8e20ee 100644 --- a/lib/datadog/tracing/contrib/aws/instrumentation.rb +++ b/lib/datadog/tracing/contrib/aws/instrumentation.rb @@ -18,24 +18,45 @@ def add_handlers(handlers, _) # Generates Spans for all interactions with AWS class Handler < Seahorse::Client::Handler + # Some services contain trace propagation information (e.g. SQS) that affect what active trace + # we'll use for the AWS span. + # But because this information is only available after the request is made, we need to make the AWS + # request first, then create the trace and span with correct distributed trace parenting. def call(context) - Tracing.trace(Ext::SPAN_COMMAND) do |span, trace| - @handler.call(context).tap do - annotate!(span, trace, ParsedContext.new(context)) - end + config = configuration + + # Find the AWS service instrumentation + parsed_context = ParsedContext.new(context) + aws_service = parsed_context.safely(:resource).split('.')[0] + handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service] + + # Execute handler stack, to ensure we have the response object before the trace and span are created + start_time = Core::Utils::Time.now.utc # Save the start time as the span creation is delayed + begin + response = @handler.call(context) + rescue Exception => e + # Catch exception to reraise it inside the trace block, to ensure the span has correct error information + end + + Tracing.trace(Ext::SPAN_COMMAND, start_time: start_time) do |span, trace| + handler.before_span(config, context, response) if handler + + annotate!(config, span, trace, parsed_context, aws_service) + + raise e if e end + + response end private # rubocop:disable Metrics/AbcSize - def annotate!(span, trace, context) - config = configuration + def annotate!(config, span, trace, context, aws_service) span.service = config[:service_name] span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND span.name = Ext::SPAN_COMMAND span.resource = context.safely(:resource) - aws_service = span.resource.split('.')[0] span.set_tag(Ext::TAG_AWS_SERVICE, aws_service) params = context.safely(:params) if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]) diff --git a/lib/datadog/tracing/contrib/aws/service/base.rb b/lib/datadog/tracing/contrib/aws/service/base.rb index bb0617863b..c0fbe788bb 100644 --- a/lib/datadog/tracing/contrib/aws/service/base.rb +++ b/lib/datadog/tracing/contrib/aws/service/base.rb @@ -7,17 +7,25 @@ module Aws module Service # Base class for all AWS service-specific tag handlers. class Base + def before_span(config, context, response); end def process(config, trace, context); end def add_tags(span, params); end MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes - def extract_propagation!(context) - message_attributes = context.params[:message_attributes] + # Extract the `_datadog` message attribute and decode its JSON content. + def extract_propagation!(response, data_type) + messages = response.data.messages + + return unless (message = messages[0]) + + message_attributes = message.message_attributes return unless message_attributes && (datadog = message_attributes['_datadog']) - Tracing.continue_trace!(Contrib.extract(datadog)) + if ((data = datadog[data_type]) && (parsed_data = JSON.parse(data))) + Tracing.continue_trace!(Contrib.extract(parsed_data)) + end end def inject_propagation(trace, params, data_type) diff --git a/lib/datadog/tracing/contrib/aws/service/sns.rb b/lib/datadog/tracing/contrib/aws/service/sns.rb index 3baddee688..39c6bbc052 100644 --- a/lib/datadog/tracing/contrib/aws/service/sns.rb +++ b/lib/datadog/tracing/contrib/aws/service/sns.rb @@ -12,16 +12,15 @@ module Aws module Service # SNS tag handlers. class SNS < Base + PROPAGATION_DATATYPE = 'Binary' + def process(config, trace, context) return unless config[:propagation] case context.operation when :publish - inject_propagation(trace, context.params, 'Binary') - when :publish_batch - context.params[:publish_batch_request_entries].each do |entry| - inject_propagation(trace, entry, 'Binary') - end + inject_propagation(trace, context.params, PROPAGATION_DATATYPE) + # TODO: when :publish_batch # Future support for batch publishing end end diff --git a/lib/datadog/tracing/contrib/aws/service/sqs.rb b/lib/datadog/tracing/contrib/aws/service/sqs.rb index dcb99d9321..4b7c103bb5 100644 --- a/lib/datadog/tracing/contrib/aws/service/sqs.rb +++ b/lib/datadog/tracing/contrib/aws/service/sqs.rb @@ -10,14 +10,12 @@ module Aws module Service # SQS tag handlers. class SQS < Base - def before_span(config, context) - # DEV: Because we only support tracing propagation today, having separate `propagation and `propagation_style` - # options seems redundant. But when the DSM propagation is introduced, it's possible for `propagation` to be - # enable and `propagation_style` to disable, while DSM propagation is still enabled, as its data is not - # directly related to tracing parentage. - if config[:propagation] && config[:parentage_style] == 'distributed' && context.operation == :receive_message - extract_propagation!(context) - end + DATATYPE = 'String' + def before_span(config, context, response) + return unless context.operation == :receive_message && config[:propagation] + + # Parent the current trace based on distributed message attributes + extract_propagation!(response, 'string_value') if config[:parentage_style] == 'distributed' end def process(config, trace, context) @@ -25,13 +23,8 @@ def process(config, trace, context) case context.operation when :send_message - inject_propagation(trace, context, 'String') - when :send_message_batch - if config[:batch_propagation] - inject_propagation(trace, context, 'String') - else - inject_propagation(trace, context, 'String') - end + inject_propagation(trace, context.params, 'String') + # TODO when :send_message_batch # Future support for batch sending end end diff --git a/lib/datadog/tracing/trace_digest.rb b/lib/datadog/tracing/trace_digest.rb index 790013e56e..0bfa876125 100644 --- a/lib/datadog/tracing/trace_digest.rb +++ b/lib/datadog/tracing/trace_digest.rb @@ -180,6 +180,58 @@ def merge(field_value_pairs) }.merge!(field_value_pairs) ) end + + def ==(other) + self.class == other.class && + span_id == other.span_id && + span_name == other.span_name && + span_resource == other.span_resource && + span_service == other.span_service && + span_type == other.span_type && + trace_distributed_tags == other.trace_distributed_tags && + trace_hostname == other.trace_hostname && + trace_id == other.trace_id && + trace_name == other.trace_name && + trace_origin == other.trace_origin && + trace_process_id == other.trace_process_id && + trace_resource == other.trace_resource && + trace_runtime_id == other.trace_runtime_id && + trace_sampling_priority == other.trace_sampling_priority && + trace_service == other.trace_service && + trace_distributed_id == other.trace_distributed_id && + trace_flags == other.trace_flags && + trace_state == other.trace_state && + trace_state_unknown_fields == other.trace_state_unknown_fields && + span_remote == other.span_remote + end + + alias eql? == + + def hash + [ + self.class, + span_id, + span_name, + span_resource, + span_service, + span_type, + trace_distributed_tags, + trace_hostname, + trace_id, + trace_name, + trace_origin, + trace_process_id, + trace_resource, + trace_runtime_id, + trace_sampling_priority, + trace_service, + trace_distributed_id, + trace_flags, + trace_state, + trace_state_unknown_fields, + span_remote + ].hash + end end end end diff --git a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb index 093135948c..ab72c7ae1c 100644 --- a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb +++ b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb @@ -397,8 +397,16 @@ let(:responses) do { receive_message: { - messages: [] - } } + messages: [ + message_attributes: { + '_datadog' => { + string_value: 'String', + data_type: 'String' + } + } + ] + } + } end it 'generates a span' do diff --git a/spec/datadog/tracing/contrib/aws/service/shared_examples.rb b/spec/datadog/tracing/contrib/aws/service/shared_examples.rb new file mode 100644 index 0000000000..be0f556324 --- /dev/null +++ b/spec/datadog/tracing/contrib/aws/service/shared_examples.rb @@ -0,0 +1,165 @@ +require 'datadog/tracing/contrib/aws/parsed_context' + +require 'aws-sdk-sqs' + +RSpec.shared_examples 'injects AWS attribute propagation' do + subject(:inject_propagation) { service.process(config, trace, context) } + + let(:config) { { propagation: true } } + let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) } + let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, params: params, operation: operation) } + let(:params) { {} } + let(:trace_id) { 1 } + let(:span_id) { 2 } + + before { Datadog.configure { |c| c.tracing.instrument :aws } } + + context 'without preexisting message attributes' do + it 'adds a propagation attribute' do + inject_propagation + expect(params[:message_attributes]).to eq( + '_datadog' => { + binary_value: + '{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \ + '"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ + '"tracestate":"dd=p:0000000000000002"}', + data_type: data_type + } + ) + end + end + + context 'with existing message attributes' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) { { 'existing' => { data_type: 'Number', string_value: 1 } } } + + it 'adds a propagation attribute' do + expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog']) + end + end + + context 'with 10 message attributes already set' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) do + Array.new(10) do |i| + ["attr#{i}", { data_type: 'Number', string_value: i }] + end.to_h + end + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end + + context 'disabled' do + let(:config) { { propagation: false } } + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end +end + +RSpec.shared_examples 'extract AWS attribute propagation' do + subject(:extract_propagation) { service.before_span(config, context, response) } + + let(:config) { { propagation: true, parentage_style: parentage_style } } + let(:parentage_style) { 'distributed' } + let(:trace) do + Datadog::Tracing::TraceOperation.new( + id: 1, + parent_span_id: 2, + remote_parent: true, + trace_state: 'unrelated=state', + sampling_priority: 0 + ) + end + let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, operation: operation) } + let(:response) do + result = Aws::SQS::Types::ReceiveMessageResult.new(messages: messages) + Seahorse::Client::Response.new(data: result) + end + let(:messages) { [] } + + before { Datadog.configure { |c| c.tracing.instrument :aws } } + + context 'without message attributes' do + context 'without an active trace' do + it 'does not create trace' do + extract_propagation + expect(Datadog::Tracing.active_trace).to be_nil + end + end + + context 'with an active trace' do + before { Datadog::Tracing.continue_trace!(trace.to_digest) } + + it 'does not change active trace' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + end + + context 'with message attributes' do + let(:messages) { [message] } + let(:message) { Aws::SQS::Types::Message.new(message_attributes: message_attributes) } + let(:message_attributes) { { '_datadog' => attribute } } + let(:attribute) { Aws::SQS::Types::MessageAttributeValue.new( + string_value: + '{"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ + '"tracestate":"dd=p:0000000000000002,unrelated=state"}', + data_type: data_type + ) } + + context 'without an active trace' do + it 'creates trace' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + + context 'with an active trace' do + it 'overrides the existing trace' do + existing_trace = Datadog::Tracing.continue_trace!(nil) + expect { extract_propagation }.to( + change { Datadog::Tracing.active_trace.to_digest }.from(existing_trace.to_digest).to(trace.to_digest) + ) + end + end + + context 'with a local parentage style' do + let(:parentage_style) { 'local' } + + it 'does not create a remote trace' do + extract_propagation + expect(Datadog::Tracing.active_trace).to be_nil + end + end + + context 'with multiple messages' do + let(:messages) { [message, other_message] } + let(:other_message) { Aws::SQS::Types::Message.new(message_attributes: other_message_attributes) } + let(:other_message_attributes) { { '_datadog' => other_attribute } } + let(:other_attribute) { Aws::SQS::Types::MessageAttributeValue.new( + string_value: + '{"traceparent":"00-00000000000000000000000000000008-0000000000000009-00",' \ + '"tracestate":"dd=p:0000000000000009,oops=not-this-one"}', + data_type: data_type + ) } + + it 'extracts the first message attributes' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + end + + context 'disabled' do + let(:config) { { propagation: false } } + + it 'does not add a propagation attribute' do + expect { extract_propagation }.to_not(change { params }) + end + end +end \ No newline at end of file diff --git a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb index 519582df89..c1a3b45442 100644 --- a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb +++ b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true require 'datadog/tracing/contrib/aws/service/sns' -require 'datadog/tracing/contrib/aws/parsed_context' +require_relative 'shared_examples' RSpec.describe Datadog::Tracing::Contrib::Aws::Service::SNS do let(:span) { instance_double('Span') } @@ -39,104 +39,9 @@ end end - shared_examples 'injects attribute propagation' do - subject(:inject_propagation) { service.process(config, trace, context) } - - let(:config) { { propagation: true } } - let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) } - let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, params: params, operation: operation) } - let(:params) { {} } - let(:operation) { :publish } - let(:trace_id) { 1 } - let(:span_id) { 2 } - - before { Datadog.configure { |c| c.tracing.instrument :aws } } - - context 'without preexisting message attributes' do - it 'adds a propagation attribute' do - inject_propagation - expect(params[:message_attributes]).to eq( - '_datadog' => { - binary_value: - '{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \ - '"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ - '"tracestate":"dd=p:0000000000000002"}', - data_type: 'Binary' - } - ) - end - end - - context 'with existing message attributes' do - let(:params) { { message_attributes: message_attributes } } - let(:message_attributes) { { 'existing' => { data_type: 'String', string_value: 'value' } } } - - it 'adds a propagation attribute' do - expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog']) - end - end - - context 'with 10 message attributes already set' do - let(:params) { { message_attributes: message_attributes } } - let(:message_attributes) do - Array.new(10) do |i| - ["attr#{i}", { data_type: 'Number', string_value: i }] - end.to_h - end - - it 'does not add a propagation attribute' do - expect { inject_propagation }.to_not(change { params }) - end - end - - context 'disabled' do - let(:config) { { propagation: false } } - - it 'does not add a propagation attribute' do - expect { inject_propagation }.to_not(change { params }) - end - end - - context 'with batch publishing' do - let(:operation) { :publish_batch } - - # resp = client.publish_batch({ - # topic_arn: "topicARN", # required - # publish_batch_request_entries: [ # required - # { - # id: "String", # required - # message: "message", # required - # subject: "subject", - # message_structure: "messageStructure", - # message_attributes: { - # "String" => { - # data_type: "String", # required - # string_value: "String", - # binary_value: "data", - # }, - # }, - # message_deduplication_id: "String", - # message_group_id: "String", - # }, - # ], - # }) - - it 'adds a propagation attribute' do - inject_propagation - expect(params[:message_attributes]).to eq( - '_datadog' => { - binary_value: - '{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \ - '"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ - '"tracestate":"dd=p:0000000000000002"}', - data_type: 'Binary' - } - ) - end - end - end - - it_behaves_like 'injects attribute propagation' do + it_behaves_like 'injects AWS attribute propagation' do let(:service) { sns } + let(:operation) { :publish } + let(:data_type) { 'Binary' } end end diff --git a/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb b/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb index fe8dac8300..895ffd1ec3 100644 --- a/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb +++ b/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'datadog/tracing/contrib/aws/service/sqs' +require_relative 'shared_examples' RSpec.describe Datadog::Tracing::Contrib::Aws::Service::SQS do let(:span) { instance_double('Span') } @@ -31,4 +32,16 @@ expect(span).to have_received(:set_tag).with(Datadog::Tracing::Contrib::Aws::Ext::TAG_QUEUE_NAME, 'AnotherQueueName') end end + + it_behaves_like 'injects AWS attribute propagation' do + let(:service) { sqs } + let(:operation) { :send_message } + let(:data_type) { 'String' } + end + + it_behaves_like 'extract AWS attribute propagation' do + let(:service) { sqs } + let(:operation) { :receive_message } + let(:data_type) { 'String' } + end end diff --git a/spec/datadog/tracing/trace_digest_spec.rb b/spec/datadog/tracing/trace_digest_spec.rb index 3303b71378..d919511e86 100644 --- a/spec/datadog/tracing/trace_digest_spec.rb +++ b/spec/datadog/tracing/trace_digest_spec.rb @@ -191,4 +191,55 @@ it { is_expected.to be_frozen } end + + describe '#==' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { is_expected.to eq(other) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { is_expected.not_to eq(other) } + end + end + + describe '#eql?' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { is_expected.to eql(other) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { is_expected.not_to eql(other) } + end + end + + describe '#hash' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { expect(trace_digest.hash).to eq(other.hash) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { expect(trace_digest.hash).not_to eq(other.hash) } + end + end end