diff --git a/lib/ddtrace/contrib/rails/framework.rb b/lib/ddtrace/contrib/rails/framework.rb index 60896da1cb..01b26085d8 100644 --- a/lib/ddtrace/contrib/rails/framework.rb +++ b/lib/ddtrace/contrib/rails/framework.rb @@ -44,6 +44,7 @@ module Framework default_grape_service: 'grape', template_base_path: 'views/', tracer: Datadog.tracer, + priority_sampling: false, debug: false, trace_agent_hostname: Datadog::Writer::HOSTNAME, trace_agent_port: Datadog::Writer::PORT, @@ -66,7 +67,8 @@ def self.configure(config) # set the address of the trace agent datadog_config[:tracer].configure( hostname: datadog_config[:trace_agent_hostname], - port: datadog_config[:trace_agent_port] + port: datadog_config[:trace_agent_port], + priority_sampling: datadog_config[:priority_sampling] ) # set default tracer tags diff --git a/lib/ddtrace/tracer.rb b/lib/ddtrace/tracer.rb index 4aea9712fd..35b40a1b47 100644 --- a/lib/ddtrace/tracer.rb +++ b/lib/ddtrace/tracer.rb @@ -122,13 +122,15 @@ def configure(options = {}) @priority_sampling = priority_sampling unless priority_sampling.nil? @enabled = enabled unless enabled.nil? - @writer.transport.hostname = hostname unless hostname.nil? - @writer.transport.port = port unless port.nil? @sampler = sampler unless sampler.nil? if priority_sampling @sampler = PrioritySampler.new(base_sampler: @sampler) + @writer = Writer.new(priority_sampler: @sampler) end + + @writer.transport.hostname = hostname unless hostname.nil? + @writer.transport.port = port unless port.nil? end # Set the information about the given service. A valid example is: diff --git a/lib/ddtrace/transport.rb b/lib/ddtrace/transport.rb index b046e5e192..98f1032ec6 100644 --- a/lib/ddtrace/transport.rb +++ b/lib/ddtrace/transport.rb @@ -20,30 +20,35 @@ class HTTPTransport RUBY_INTERPRETER = RUBY_VERSION > '1.9' ? RUBY_ENGINE + '-' + RUBY_PLATFORM : 'ruby-' + RUBY_PLATFORM API = { - 'v0.3' => { + V4 = 'v0.4'.freeze => { + traces_endpoint: '/v0.4/traces'.freeze, + services_endpoint: '/v0.4/services'.freeze, + encoder: Encoding::MsgpackEncoder, + fallback: 'v0.3'.freeze + }.freeze, + V3 = 'v0.3'.freeze => { traces_endpoint: '/v0.3/traces'.freeze, services_endpoint: '/v0.3/services'.freeze, encoder: Encoding::MsgpackEncoder, fallback: 'v0.2'.freeze }.freeze, - 'v0.2' => { + V2 = 'v0.2'.freeze => { traces_endpoint: '/v0.2/traces'.freeze, services_endpoint: '/v0.2/services'.freeze, encoder: Encoding::JSONEncoder }.freeze }.freeze - DEFAULT_API = 'v0.3'.freeze - - private_constant :API, :DEFAULT_API + private_constant :API def initialize(hostname, port, options = {}) - api_version = options.fetch(:api_version, DEFAULT_API) + api_version = options.fetch(:api_version, V3) @hostname = hostname @port = port @api = API.fetch(api_version) @encoder = options[:encoder] || @api[:encoder].new + @response_callback = options[:response_callback] # overwrite the Content-type with the one chosen in the Encoder @headers = options.fetch(:headers, {}) @@ -159,6 +164,8 @@ def handle_response(response) @mutex.synchronize { @count_server_error += 1 } end + process_callback(response) + status_code rescue StandardError => e Datadog::Tracer.log.error(e.message) @@ -176,5 +183,15 @@ def stats } end end + + private + + def process_callback(response) + return unless @response_callback && @response_callback.respond_to?(:call) + + @response_callback.call(response) + rescue => e + Tracer.log.debug("Error processing callback: #{e}") + end end end diff --git a/lib/ddtrace/writer.rb b/lib/ddtrace/writer.rb index c7f0d8bf04..f02dd9d0b7 100644 --- a/lib/ddtrace/writer.rb +++ b/lib/ddtrace/writer.rb @@ -14,9 +14,20 @@ def initialize(options = {}) # writer and transport parameters @buff_size = options.fetch(:buffer_size, 100) @flush_interval = options.fetch(:flush_interval, 1) + transport_options = options.fetch(:transport_options, {}) + + # priority sampling + if options[:priority_sampler] + @priority_sampler = options[:priority_sampler] + transport_options[:api_version] ||= HTTPTransport::V4 + transport_options[:response_callback] ||= method(:sampling_updater) + end # transport and buffers - @transport = options.fetch(:transport, Datadog::HTTPTransport.new(HOSTNAME, PORT)) + @transport = options.fetch(:transport) do + HTTPTransport.new(HOSTNAME, PORT, transport_options) + end + @services = {} # handles the thread creation after an eventual fork @@ -103,5 +114,13 @@ def stats transport: @transport.stats } end + + private + + def sampling_updater(response) + return unless response.is_a?(Net::HTTPOK) + service_rates = JSON.parse(response.body) + @priority_sampler.update(service_rates) + end end end diff --git a/test/writer_test.rb b/test/writer_test.rb new file mode 100644 index 0000000000..47ebbbc4cd --- /dev/null +++ b/test/writer_test.rb @@ -0,0 +1,47 @@ +require 'helper' +require 'ddtrace/transport' +require 'ddtrace/writer' +require 'json' + +module Datadog + class WriterTest < Minitest::Test + def setup + WebMock.enable! + end + + def teardown + WebMock.reset! + WebMock.disable! + end + + def test_sampling_feedback_loop + sampler = Minitest::Mock.new + writer = Writer.new(priority_sampler: sampler) + response_body = { 'service:a,env:test' => 0.1, 'service:b,env:test' => 0.5 }.to_json + v4_endpoint = stub_request(:post, traces_endpoint).to_return(body: response_body) + + sampler.expect(:update, true, [{ 'service:a,env:test' => 0.1, 'service:b,env:test' => 0.5 }]) + writer.send_spans(get_test_traces(1), writer.transport) + assert_requested(v4_endpoint) + sampler.verify + end + + def test_outdated_agent + sampler = Minitest::Mock.new + writer = Writer.new(priority_sampler: sampler) + v4_endpoint = stub_request(:post, traces_endpoint).to_return(status: 404) + v3_endpoint = stub_request(:post, traces_endpoint(HTTPTransport::V3)) + + writer.send_spans(get_test_traces(1), writer.transport) + + assert_requested(v4_endpoint) + assert_requested(v3_endpoint) + end + + private + + def traces_endpoint(api_version = HTTPTransport::V4) + "#{Writer::HOSTNAME}:#{Writer::PORT}/#{api_version}/traces" + end + end +end