Skip to content

Commit

Permalink
Reuse HTTP connection between trace flushes
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Oct 6, 2022
1 parent 7cbd3c4 commit a139144
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 40 deletions.
2 changes: 2 additions & 0 deletions lib/datadog/tracing/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def stop
def stop_worker
@stopped = true

@transport.stop

return if @worker.nil?

@worker.stop
Expand Down
28 changes: 21 additions & 7 deletions lib/ddtrace/transport/http/adapters/net.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ class Net
:timeout,
:ssl

# in seconds
# In seconds.
DEFAULT_TIMEOUT = 30

# If this many seconds have passed since the last request, create a new TCP connection.
# In seconds.
KEEP_ALIVE_TIMEOUT = 60

# @deprecated Positional parameters are deprecated. Use named parameters instead.
def initialize(hostname = nil, port = nil, **options)
@hostname = hostname || options.fetch(:hostname)
Expand All @@ -39,12 +43,17 @@ def open(&block)
# DEV Initializing +Net::HTTP+ directly help us avoid expensive
# options processing done in +Net::HTTP.start+:
# https://github.com/ruby/ruby/blob/b2d96abb42abbe2e01f010ffc9ac51f0f9a50002/lib/net/http.rb#L614-L618
req = ::Net::HTTP.new(hostname, port, nil)
@req ||= begin
req = ::Net::HTTP.new(hostname, port, nil)

req.use_ssl = ssl
req.open_timeout = req.read_timeout = timeout
req.keep_alive_timeout = KEEP_ALIVE_TIMEOUT

req.use_ssl = ssl
req.open_timeout = req.read_timeout = timeout
req.start
end

req.start(&block)
yield @req
end

def call(env)
Expand All @@ -56,8 +65,6 @@ def call(env)
end

def post(env)
post = nil

if env.form.nil? || env.form.empty?
post = ::Net::HTTP::Post.new(env.path, env.headers)
post.body = env.body
Expand All @@ -82,6 +89,13 @@ def url
"http://#{hostname}:#{port}?timeout=#{timeout}"
end

def close
if @req && @req.started?
@req.finish
@req = nil
end
end

# Raised when called with an unknown HTTP method
class UnknownHTTPMethod < StandardError
attr_reader :verb
Expand Down
4 changes: 4 additions & 0 deletions lib/ddtrace/transport/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def send_request(request, &block)
def build_env(request)
Env.new(request)
end

def close
@api.adapter.close if @api.adapter.respond_to?(:close)
end
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/ddtrace/transport/io/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def send_traces(traces)
Traces::Response.new(result)
end]
end

def stop; end
end

# Encoder for IO-specific trace encoding
Expand Down
6 changes: 6 additions & 0 deletions lib/ddtrace/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def current_api
apis[@current_api_id]
end

def stop
@client.close if @client
end

private

def downgrade?(response)
Expand All @@ -181,6 +185,8 @@ def downgrade!
def change_api!(api_id)
raise UnknownApiVersionError, api_id unless apis.key?(api_id)

@client.close if @client

@current_api_id = api_id
@client = HTTP::Client.new(current_api)
end
Expand Down
16 changes: 2 additions & 14 deletions spec/datadog/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,15 @@ def thread_count
end

context 'for file descriptors' do
def open_file_descriptors
# Unix-specific way to get the current process' open file descriptors and the files (if any) they correspond to
Dir['/dev/fd/*'].each_with_object({}) do |fd, hash|
hash[fd] =
begin
File.realpath(fd)
rescue SystemCallError # This can fail due to... reasons, and we only want it for debugging so let's ignore
nil
end
end
end

it 'closes tracer file descriptors (known flaky test)' do
before_open_file_descriptors = open_file_descriptors
before_open_file_descriptors = ObjectSpaceHelper.open_file_descriptors

start_tracer
wait_for_tracer_sent

shutdown

after_open_file_descriptors = open_file_descriptors
after_open_file_descriptors = ObjectSpaceHelper.open_file_descriptors

expect(after_open_file_descriptors.size)
.to(
Expand Down
42 changes: 30 additions & 12 deletions spec/datadog/tracing/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,21 @@
it { expect(stats).to include(traces_flushed: 0) }
end

before do
WebMock.disable!
Datadog.configuration.reset!
end

after { tracer.shutdown! }

def wait_for_flush(stat = :traces_flushed, num = 1)
test_repeat.times do
break if tracer.writer.stats[stat] >= num

sleep(0.1)
end
end

describe 'agent receives span' do
include_context 'agent-based test'

Expand All @@ -57,14 +70,6 @@ def create_trace
end
end

def wait_for_flush(stat, num = 1)
test_repeat.times do
break if tracer.writer.stats[stat] >= num

sleep(0.1)
end
end

def agent_receives_span_step1
stats = tracer.writer.stats
expect(stats[:traces_flushed]).to eq(0)
Expand Down Expand Up @@ -143,24 +148,32 @@ def agent_receives_span_step3(previous_success)
describe 'agent receives short span' do
include_context 'agent-based test'

before do
def trace
tracer.trace('my.short.op') do |span|
@span = span
span.service = 'my.service'
end

@first_shutdown = tracer.shutdown!
force_synchronous_flush(tracer.writer.worker)
end

let(:stats) { tracer.writer.stats }

it do
expect(@first_shutdown).to be true
trace

expect(@span.finished?).to be true
expect(stats[:services_flushed]).to be_nil
end

it_behaves_like 'flushed trace'
it 'reuses the same HTTP connection' do
expect { trace }.to change { ObjectSpaceHelper.open_file_descriptors.size }.by(1)
expect { trace }.to_not(change { ObjectSpaceHelper.open_file_descriptors })
end

it_behaves_like 'flushed trace' do
before { trace }
end
end

describe 'rule sampler' do
Expand Down Expand Up @@ -319,6 +332,8 @@ def agent_receives_span_step3(previous_success)
function.call(traces)
end

WebMock.enable!

trace # Run test subject
tracer.shutdown! # Ensure trace is flushed, so we can read writer statistics
end
Expand Down Expand Up @@ -347,6 +362,7 @@ def agent_receives_span_step3(previous_success)
end

after do
WebMock.disable!
Datadog.configuration.tracing.sampling.reset!
end

Expand Down Expand Up @@ -468,6 +484,8 @@ def sample!(trace)
span.service = 'my.service'
end

wait_for_flush

threads = Array.new(10) do
Thread.new { tracer.shutdown! }
end
Expand Down
2 changes: 1 addition & 1 deletion spec/datadog/tracing/writer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
subject(:writer) { described_class.new(options) }

let(:options) { { transport: transport } }
let(:transport) { instance_double(Datadog::Transport::Traces::Transport) }
let(:transport) { instance_double(Datadog::Transport::Traces::Transport, stop: nil) }

describe 'behavior' do
describe '#initialize' do
Expand Down
26 changes: 25 additions & 1 deletion spec/ddtrace/transport/http/adapters/net_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@

allow(http_connection).to receive(:open_timeout=).with(adapter.timeout)
allow(http_connection).to receive(:read_timeout=).with(adapter.timeout)
allow(http_connection).to receive(:keep_alive_timeout=).with(60)
allow(http_connection).to receive(:use_ssl=).with(adapter.ssl)

allow(http_connection).to receive(:start).and_yield(http_connection)
allow(http_connection).to receive(:start).and_return(http_connection)
end
end

Expand Down Expand Up @@ -186,6 +187,29 @@

it { is_expected.to eq('http://local.test:345?timeout=7') }
end

describe '#close' do
include_context 'HTTP connection stub'

before do
allow(http_connection).to receive(:started?).and_return(true)

adapter.open {}
end

it 'closes connection' do
expect(http_connection).to receive(:finish).once

adapter.close
end

it 'close can be called multiple times' do
expect(http_connection).to receive(:finish).once

adapter.close
adapter.close
end
end
end

RSpec.describe Datadog::Transport::HTTP::Adapters::Net::Response do
Expand Down
8 changes: 4 additions & 4 deletions spec/ddtrace/transport/traces_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@
].with_fallbacks(v2: :v1)
end

let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1) }
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2) }
let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1, adapter: nil) }
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2, adapter: nil) }
let(:encoder_v1) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/plain') }
let(:encoder_v2) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/csv') }
end
Expand Down Expand Up @@ -162,8 +162,8 @@
let(:lazy_chunks) { chunks.lazy }

let(:request) { instance_double(Datadog::Transport::Traces::Request) }
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client) }
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client) }
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }

let(:chunker) { instance_double(Datadog::Transport::Traces::Chunker, max_size: 1) }

Expand Down
2 changes: 2 additions & 0 deletions spec/support/faux_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ def send_traces(*)
)
)]
end

def stop; end
end
2 changes: 2 additions & 0 deletions spec/support/faux_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ def spans(action = :clear)
end
end
end

def stop; end
end
13 changes: 12 additions & 1 deletion spec/support/object_space_helper.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# typed: true
# typed: false

require 'objspace'

Expand All @@ -17,5 +17,16 @@ def estimate_bytesize(object)
sum + ::ObjectSpace.memsize_of(object.instance_variable_get(var))
end
end

def open_file_descriptors
ObjectSpace.each_object(::IO).to_a.sort_by(&:object_id).select do |io|
begin
!io.closed?
rescue IOError
# Rescue for "uninitialized stream" errors.
false
end
end
end
end
end
17 changes: 17 additions & 0 deletions spec/support/tracer_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'datadog/tracing/tracer'
require 'datadog/tracing/trace_operation'
require 'support/faux_writer'
require 'support/synchronization_helpers'

module TracerHelpers
# Return a test tracer instance with a faux writer.
Expand Down Expand Up @@ -110,4 +111,20 @@ def tracer_shutdown!

without_warnings { Datadog.send(:reset!) }
end

# Wakes up the main worker thread loop, forcing a flush.
# Returns when all traces are flushed from the internal buffer.
#
# @param [Datadog::Tracing::Workers::AsyncTransport] async_transport
def force_synchronous_flush(async_transport, attempts: 50, backoff: 0.1)
async_transport.instance_exec do
@mutex.synchronize do
@shutdown.signal
end

SynchronizationHelpers.try_wait_until(attempts: attempts, backoff: backoff) do
@trace_buffer.empty?
end
end
end
end

0 comments on commit a139144

Please sign in to comment.