Skip to content

Commit

Permalink
Add feedback loop to sampler
Browse files Browse the repository at this point in the history
  • Loading branch information
p-lambert committed Nov 16, 2017
1 parent 148cb8b commit f73cc15
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
4 changes: 3 additions & 1 deletion lib/ddtrace/contrib/rails/framework.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ module Framework
default_grape_service: 'grape',
template_base_path: 'views/',
tracer: Datadog.tracer,
priority_sampling: false,
debug: false,
trace_agent_hostname: Datadog::Writer::HOSTNAME,
trace_agent_port: Datadog::Writer::PORT,
Expand All @@ -66,7 +67,8 @@ def self.configure(config)
# set the address of the trace agent
datadog_config[:tracer].configure(
hostname: datadog_config[:trace_agent_hostname],
port: datadog_config[:trace_agent_port]
port: datadog_config[:trace_agent_port],
priority_sampling: datadog_config[:priority_sampling]
)

# set default tracer tags
Expand Down
6 changes: 4 additions & 2 deletions lib/ddtrace/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ def configure(options = {})

@priority_sampling = priority_sampling unless priority_sampling.nil?
@enabled = enabled unless enabled.nil?
@writer.transport.hostname = hostname unless hostname.nil?
@writer.transport.port = port unless port.nil?
@sampler = sampler unless sampler.nil?

if priority_sampling
@sampler = PrioritySampler.new(base_sampler: @sampler)
@writer = Writer.new(priority_sampler: @sampler)
end

@writer.transport.hostname = hostname unless hostname.nil?
@writer.transport.port = port unless port.nil?
end

# Set the information about the given service. A valid example is:
Expand Down
29 changes: 23 additions & 6 deletions lib/ddtrace/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,35 @@ class HTTPTransport
RUBY_INTERPRETER = RUBY_VERSION > '1.9' ? RUBY_ENGINE + '-' + RUBY_PLATFORM : 'ruby-' + RUBY_PLATFORM

API = {
'v0.3' => {
V4 = 'v0.4'.freeze => {
traces_endpoint: '/v0.4/traces'.freeze,
services_endpoint: '/v0.4/services'.freeze,
encoder: Encoding::MsgpackEncoder,
fallback: 'v0.3'.freeze
}.freeze,
V3 = 'v0.3'.freeze => {
traces_endpoint: '/v0.3/traces'.freeze,
services_endpoint: '/v0.3/services'.freeze,
encoder: Encoding::MsgpackEncoder,
fallback: 'v0.2'.freeze
}.freeze,
'v0.2' => {
V2 = 'v0.2'.freeze => {
traces_endpoint: '/v0.2/traces'.freeze,
services_endpoint: '/v0.2/services'.freeze,
encoder: Encoding::JSONEncoder
}.freeze
}.freeze

DEFAULT_API = 'v0.3'.freeze

private_constant :API, :DEFAULT_API
private_constant :API

def initialize(hostname, port, options = {})
api_version = options.fetch(:api_version, DEFAULT_API)
api_version = options.fetch(:api_version, V3)

@hostname = hostname
@port = port
@api = API.fetch(api_version)
@encoder = options[:encoder] || @api[:encoder].new
@response_callback = options[:response_callback]

# overwrite the Content-type with the one chosen in the Encoder
@headers = options.fetch(:headers, {})
Expand Down Expand Up @@ -159,6 +164,8 @@ def handle_response(response)
@mutex.synchronize { @count_server_error += 1 }
end

process_callback(response)

status_code
rescue StandardError => e
Datadog::Tracer.log.error(e.message)
Expand All @@ -176,5 +183,15 @@ def stats
}
end
end

private

def process_callback(response)
return unless @response_callback && @response_callback.respond_to?(:call)

@response_callback.call(response)
rescue => e
Tracer.log.debug("Error processing callback: #{e}")
end
end
end
21 changes: 20 additions & 1 deletion lib/ddtrace/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,20 @@ def initialize(options = {})
# writer and transport parameters
@buff_size = options.fetch(:buffer_size, 100)
@flush_interval = options.fetch(:flush_interval, 1)
transport_options = options.fetch(:transport_options, {})

# priority sampling
if options[:priority_sampler]
@priority_sampler = options[:priority_sampler]
transport_options[:api_version] ||= HTTPTransport::V4
transport_options[:response_callback] ||= method(:sampling_updater)
end

# transport and buffers
@transport = options.fetch(:transport, Datadog::HTTPTransport.new(HOSTNAME, PORT))
@transport = options.fetch(:transport) do
HTTPTransport.new(HOSTNAME, PORT, transport_options)
end

@services = {}

# handles the thread creation after an eventual fork
Expand Down Expand Up @@ -103,5 +114,13 @@ def stats
transport: @transport.stats
}
end

private

def sampling_updater(response)
return unless response.is_a?(Net::HTTPOK)
service_rates = JSON.parse(response.body)
@priority_sampler.update(service_rates)
end
end
end
38 changes: 38 additions & 0 deletions test/writer_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
require 'helper'
require 'ddtrace/transport'
require 'ddtrace/writer'
require 'json'

module Datadog
class WriterTest < Minitest::Test
def test_sampling_feedback_loop
sampler = Minitest::Mock.new
writer = Writer.new(priority_sampler: sampler)
response_body = { 'service_a' => 0.1, 'service_b' => 0.5 }.to_json
v4_endpoint = stub_request(:post, traces_endpoint).to_return(body: response_body)

sampler.expect(:update, true, [{ 'service_a' => 0.1, 'service_b' => 0.5 }])
writer.send_spans(get_test_traces(1), writer.transport)
assert_requested(v4_endpoint)
sampler.verify
end

def test_outdated_agent
sampler = Minitest::Mock.new
writer = Writer.new(priority_sampler: sampler)
v4_endpoint = stub_request(:post, traces_endpoint).to_return(status: 404)
v3_endpoint = stub_request(:post, traces_endpoint(HTTPTransport::V3))

writer.send_spans(get_test_traces(1), writer.transport)

assert_requested(v4_endpoint)
assert_requested(v3_endpoint)
end

private

def traces_endpoint(api_version = HTTPTransport::V4)
"#{Writer::HOSTNAME}:#{Writer::PORT}/#{api_version}/traces"
end
end
end

0 comments on commit f73cc15

Please sign in to comment.