From 74f94dfb548633cb5f3d05b21cb69b24d04cd6d3 Mon Sep 17 00:00:00 2001 From: David Elner Date: Thu, 15 Mar 2018 18:34:15 -0400 Subject: [PATCH] Fixed: Writer#sampling_updater callbacks for services producing errors. --- lib/ddtrace/transport.rb | 40 ++++++++++++++----------- lib/ddtrace/writer.rb | 4 +-- spec/ddtrace/writer_spec.rb | 60 +++++++++++++++++++++++-------------- 3 files changed, 63 insertions(+), 41 deletions(-) diff --git a/lib/ddtrace/transport.rb b/lib/ddtrace/transport.rb index 9d54c07770..34b7071917 100644 --- a/lib/ddtrace/transport.rb +++ b/lib/ddtrace/transport.rb @@ -75,11 +75,15 @@ def send(endpoint, data) case endpoint when :services payload = @encoder.encode_services(data) - status_code = post(@api[:services_endpoint], payload) + status_code = post(@api[:services_endpoint], payload) do |response| + process_callback(:services, response) + end when :traces count = data.length payload = @encoder.encode_traces(data) - status_code = post(@api[:traces_endpoint], payload, count) + status_code = post(@api[:traces_endpoint], payload, count) do |response| + process_callback(:traces, response) + end else Datadog::Tracer.log.error("Unsupported endpoint: #{endpoint}") return nil @@ -95,17 +99,21 @@ def send(endpoint, data) # send data to the trace-agent; the method is thread-safe def post(url, data, count = nil) - Datadog::Tracer.log.debug("Sending data from process: #{Process.pid}") - headers = count.nil? ? {} : { TRACE_COUNT_HEADER => count.to_s } - headers = headers.merge(@headers) - request = Net::HTTP::Post.new(url, headers) - request.body = data - - response = Net::HTTP.start(@hostname, @port, read_timeout: TIMEOUT) { |http| http.request(request) } - handle_response(response) - rescue StandardError => e - Datadog::Tracer.log.error(e.message) - 500 + begin + Datadog::Tracer.log.debug("Sending data from process: #{Process.pid}") + headers = count.nil? ? {} : { TRACE_COUNT_HEADER => count.to_s } + headers = headers.merge(@headers) + request = Net::HTTP::Post.new(url, headers) + request.body = data + + response = Net::HTTP.start(@hostname, @port, read_timeout: TIMEOUT) { |http| http.request(request) } + handle_response(response) + rescue StandardError => e + Datadog::Tracer.log.error(e.message) + 500 + end.tap do + yield(response) if block_given? + end end # Downgrade the connection to a compatibility version of the HTTPTransport; @@ -173,8 +181,6 @@ def handle_response(response) @mutex.synchronize { @count_server_error += 1 } end - process_callback(response) - status_code rescue StandardError => e Datadog::Tracer.log.error(e.message) @@ -195,10 +201,10 @@ def stats private - def process_callback(response) + def process_callback(action, response) return unless @response_callback && @response_callback.respond_to?(:call) - @response_callback.call(response, @api) + @response_callback.call(action, response, @api) rescue => e Tracer.log.debug("Error processing callback: #{e}") end diff --git a/lib/ddtrace/writer.rb b/lib/ddtrace/writer.rb index 91ec9c12ff..e041aac394 100644 --- a/lib/ddtrace/writer.rb +++ b/lib/ddtrace/writer.rb @@ -117,8 +117,8 @@ def stats private - def sampling_updater(response, api) - return unless response.is_a?(Net::HTTPOK) + def sampling_updater(action, response, api) + return unless action == :traces && response.is_a?(Net::HTTPOK) if api[:version] == HTTPTransport::V4 service_rates = JSON.parse(response.body) diff --git a/spec/ddtrace/writer_spec.rb b/spec/ddtrace/writer_spec.rb index 54ca23067b..16229416f6 100644 --- a/spec/ddtrace/writer_spec.rb +++ b/spec/ddtrace/writer_spec.rb @@ -74,10 +74,14 @@ let(:response) { { body: body } } let(:body) { 'body' } - shared_examples_for 'an API version' do + shared_examples_for 'a traces API' do context 'that succeeds' do before(:each) do - expect(callback).to receive(:call).with(a_kind_of(Net::HTTPOK), a_kind_of(Hash)) do |response, api| + expect(callback).to receive(:call).with( + :traces, + a_kind_of(Net::HTTPOK), + a_kind_of(Hash) + ) do |action, response, api| expect(api[:version]).to eq(api_version) end end @@ -96,7 +100,11 @@ before(:each) do call_count = 0 - allow(callback).to receive(:call).with(a_kind_of(Net::HTTPResponse), a_kind_of(Hash)) do |response, api| + allow(callback).to receive(:call).with( + :traces, + a_kind_of(Net::HTTPResponse), + a_kind_of(Hash) + ) do |action, response, api| call_count += 1 if call_count == 1 expect(response).to be_a_kind_of(Net::HTTPNotFound) @@ -116,14 +124,14 @@ end context 'API v4' do - it_behaves_like 'an API version' do + it_behaves_like 'a traces API' do let(:api_version) { Datadog::HTTPTransport::V4 } let(:fallback_version) { Datadog::HTTPTransport::V3 } end end context 'API v3' do - it_behaves_like 'an API version' do + it_behaves_like 'a traces API' do let(:api_version) { Datadog::HTTPTransport::V3 } let(:fallback_version) { Datadog::HTTPTransport::V2 } end @@ -133,39 +141,47 @@ end describe '#sampling_updater' do - subject(:result) { writer.send(:sampling_updater, response, api) } - + subject(:result) { writer.send(:sampling_updater, action, response, api) } let(:options) { { priority_sampler: sampler } } let(:sampler) { instance_double(Datadog::PrioritySampler) } + let(:action) { :traces } + let(:response) { double('response') } let(:api) { double('api') } context 'given a response that' do context 'isn\'t OK' do - let(:response) { double('failure response') } + let(:response) { mock_http_request(method: :post, status: 404)[:response] } + it { is_expected.to be nil } + end + + context 'isn\'t a :traces action' do + let(:action) { :services } it { is_expected.to be nil } end context 'is OK' do let(:response) { mock_http_request(method: :post, body: body)[:response] } - context 'and is API v4' do - let(:api) { { version: Datadog::HTTPTransport::V4 } } - let(:service_rates) { { 'service:a,env:test' => 0.1, 'service:b,env:test' => 0.5 } } - let(:body) { service_rates.to_json } + context 'and is a :traces action' do + context 'and is API v4' do + let(:api) { { version: Datadog::HTTPTransport::V4 } } + let(:service_rates) { { 'service:a,env:test' => 0.1, 'service:b,env:test' => 0.5 } } + let(:body) { service_rates.to_json } - it do - expect(sampler).to receive(:update).with(service_rates) - is_expected.to be true + it do + expect(sampler).to receive(:update).with(service_rates) + is_expected.to be true + end end - end - context 'and is API v3' do - let(:api) { { version: Datadog::HTTPTransport::V3 } } - let(:body) { 'OK' } + context 'and is API v3' do + let(:api) { { version: Datadog::HTTPTransport::V3 } } + let(:body) { 'OK' } - it do - expect(sampler).to_not receive(:update) - is_expected.to be false + it do + expect(sampler).to_not receive(:update) + is_expected.to be false + end end end end