Skip to content

Commit 354311a

Browse files
committed
Reuse HTTP connection between trace flushes
1 parent 7b39ffe commit 354311a

File tree

14 files changed

+141
-41
lines changed

14 files changed

+141
-41
lines changed

lib/datadog/tracing/writer.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ def stop
9595
def stop_worker
9696
@stopped = true
9797

98+
@transport.stop
99+
98100
return if @worker.nil?
99101

100102
@worker.stop

lib/ddtrace/transport/http/adapters/net.rb

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@ class Net
1515
:timeout,
1616
:ssl
1717

18-
# in seconds
18+
# In seconds.
1919
DEFAULT_TIMEOUT = 30
2020

21+
# If this many seconds have passed since the last request, create a new TCP connection.
22+
# In seconds.
23+
KEEP_ALIVE_TIMEOUT = 60
24+
2125
# @deprecated Positional parameters are deprecated. Use named parameters instead.
2226
def initialize(hostname = nil, port = nil, **options)
2327
@hostname = hostname || options.fetch(:hostname)
@@ -39,12 +43,17 @@ def open(&block)
3943
# DEV Initializing +Net::HTTP+ directly help us avoid expensive
4044
# options processing done in +Net::HTTP.start+:
4145
# https://github.com/ruby/ruby/blob/b2d96abb42abbe2e01f010ffc9ac51f0f9a50002/lib/net/http.rb#L614-L618
42-
req = ::Net::HTTP.new(hostname, port, nil)
46+
@req ||= begin
47+
req = ::Net::HTTP.new(hostname, port, nil)
48+
49+
req.use_ssl = ssl
50+
req.open_timeout = req.read_timeout = timeout
51+
req.keep_alive_timeout = KEEP_ALIVE_TIMEOUT
4352

44-
req.use_ssl = ssl
45-
req.open_timeout = req.read_timeout = timeout
53+
req.start # Calling #start without a block creates a long-lived connection
54+
end
4655

47-
req.start(&block)
56+
yield @req
4857
end
4958

5059
def call(env)
@@ -82,6 +91,14 @@ def url
8291
"http://#{hostname}:#{port}?timeout=#{timeout}"
8392
end
8493

94+
def close
95+
# Clean up HTTP socket
96+
if @req && @req.started?
97+
@req.finish
98+
@req = nil
99+
end
100+
end
101+
85102
# Raised when called with an unknown HTTP method
86103
class UnknownHTTPMethod < StandardError
87104
attr_reader :verb

lib/ddtrace/transport/http/client.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def send_request(request, &block)
4848
def build_env(request)
4949
Env.new(request)
5050
end
51+
52+
def close
53+
@api.adapter.close if @api.adapter.respond_to?(:close)
54+
end
5155
end
5256
end
5357
end

lib/ddtrace/transport/io/traces.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def send_traces(traces)
4343
Traces::Response.new(result)
4444
end]
4545
end
46+
47+
def stop; end
4648
end
4749

4850
# Encoder for IO-specific trace encoding

lib/ddtrace/transport/traces.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ def current_api
165165
apis[@current_api_id]
166166
end
167167

168+
def stop
169+
@client.close if @client
170+
end
171+
168172
private
169173

170174
def downgrade?(response)
@@ -183,6 +187,8 @@ def downgrade!
183187
def change_api!(api_id)
184188
raise UnknownApiVersionError, api_id unless apis.key?(api_id)
185189

190+
@client.close if @client
191+
186192
@current_api_id = api_id
187193
@client = HTTP::Client.new(current_api)
188194
end

spec/datadog/integration_spec.rb

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
require 'datadog/opentracer'
55
require 'datadog/statsd'
66

7+
require_relative '../support/system_helper'
8+
79
RSpec.describe 'Datadog integration' do
810
context 'graceful shutdown', :integration do
911
before do
@@ -42,27 +44,15 @@ def thread_count
4244
end
4345

4446
context 'for file descriptors' do
45-
def open_file_descriptors
46-
# Unix-specific way to get the current process' open file descriptors and the files (if any) they correspond to
47-
Dir['/dev/fd/*'].each_with_object({}) do |fd, hash|
48-
hash[fd] =
49-
begin
50-
File.realpath(fd)
51-
rescue SystemCallError # This can fail due to... reasons, and we only want it for debugging so let's ignore
52-
nil
53-
end
54-
end
55-
end
56-
5747
it 'closes tracer file descriptors (known flaky test)' do
58-
before_open_file_descriptors = open_file_descriptors
48+
before_open_file_descriptors = SystemHelper.open_fds
5949

6050
start_tracer
6151
wait_for_tracer_sent
6252

6353
shutdown
6454

65-
after_open_file_descriptors = open_file_descriptors
55+
after_open_file_descriptors = SystemHelper.open_fds
6656

6757
expect(after_open_file_descriptors.size)
6858
.to(

spec/datadog/tracing/integration_spec.rb

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# typed: ignore
22

33
require 'spec_helper'
4+
require_relative '../../support/system_helper'
45

56
require 'datadog/statsd'
67

@@ -45,8 +46,17 @@
4546
it { expect(stats).to include(traces_flushed: 0) }
4647
end
4748

49+
before do
50+
WebMock.disable!
51+
Datadog.configuration.reset!
52+
end
53+
4854
after { tracer.shutdown! }
4955

56+
def wait_for_flush(num = 1)
57+
try_wait_until { tracer.writer.stats[:traces_flushed] >= num }
58+
end
59+
5060
describe 'agent receives span' do
5161
include_context 'agent-based test'
5262

@@ -57,14 +67,6 @@ def create_trace
5767
end
5868
end
5969

60-
def wait_for_flush(stat, num = 1)
61-
test_repeat.times do
62-
break if tracer.writer.stats[stat] >= num
63-
64-
sleep(0.1)
65-
end
66-
end
67-
6870
def agent_receives_span_step1
6971
stats = tracer.writer.stats
7072
expect(stats[:traces_flushed]).to eq(0)
@@ -78,7 +80,7 @@ def agent_receives_span_step2
7880
create_trace
7981

8082
# Timeout after 3 seconds, waiting for 1 flush
81-
wait_for_flush(:traces_flushed)
83+
wait_for_flush
8284

8385
stats = tracer.writer.stats
8486
expect(stats[:traces_flushed]).to eq(1)
@@ -95,8 +97,8 @@ def agent_receives_span_step2
9597
def agent_receives_span_step3(previous_success)
9698
create_trace
9799

98-
# Timeout after 3 seconds, waiting for another flush
99-
wait_for_flush(:traces_flushed, 2)
100+
# Timeout after 3 seconds, waiting for second flush
101+
wait_for_flush(2)
100102

101103
stats = tracer.writer.stats
102104
expect(stats[:traces_flushed]).to eq(2)
@@ -143,24 +145,32 @@ def agent_receives_span_step3(previous_success)
143145
describe 'agent receives short span' do
144146
include_context 'agent-based test'
145147

146-
before do
148+
def trace
147149
tracer.trace('my.short.op') do |span|
148150
@span = span
149151
span.service = 'my.service'
150152
end
151153

152-
@first_shutdown = tracer.shutdown!
154+
force_synchronous_flush(tracer.writer.worker)
153155
end
154156

155157
let(:stats) { tracer.writer.stats }
156158

157159
it do
158-
expect(@first_shutdown).to be true
160+
trace
161+
159162
expect(@span.finished?).to be true
160163
expect(stats[:services_flushed]).to be_nil
161164
end
162165

163-
it_behaves_like 'flushed trace'
166+
it 'reuses the same HTTP connection' do
167+
trace
168+
expect { trace }.to_not(change { SystemHelper.open_fds })
169+
end
170+
171+
it_behaves_like 'flushed trace' do
172+
before { trace }
173+
end
164174
end
165175

166176
describe 'rule sampler' do
@@ -319,6 +329,8 @@ def agent_receives_span_step3(previous_success)
319329
function.call(traces)
320330
end
321331

332+
WebMock.enable!
333+
322334
trace # Run test subject
323335
tracer.shutdown! # Ensure trace is flushed, so we can read writer statistics
324336
end
@@ -347,6 +359,7 @@ def agent_receives_span_step3(previous_success)
347359
end
348360

349361
after do
362+
WebMock.disable!
350363
Datadog.configuration.tracing.sampling.reset!
351364
end
352365

@@ -468,6 +481,8 @@ def sample!(trace)
468481
span.service = 'my.service'
469482
end
470483

484+
wait_for_flush
485+
471486
threads = Array.new(10) do
472487
Thread.new { tracer.shutdown! }
473488
end

spec/datadog/tracing/writer_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
subject(:writer) { described_class.new(options) }
2323

2424
let(:options) { { transport: transport } }
25-
let(:transport) { instance_double(Datadog::Transport::Traces::Transport) }
25+
let(:transport) { instance_double(Datadog::Transport::Traces::Transport, stop: nil) }
2626

2727
describe 'behavior' do
2828
describe '#initialize' do

spec/ddtrace/transport/http/adapters/net_spec.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626

2727
allow(http_connection).to receive(:open_timeout=).with(adapter.timeout)
2828
allow(http_connection).to receive(:read_timeout=).with(adapter.timeout)
29+
allow(http_connection).to receive(:keep_alive_timeout=).with(60)
2930
allow(http_connection).to receive(:use_ssl=).with(adapter.ssl)
3031

31-
allow(http_connection).to receive(:start).and_yield(http_connection)
32+
allow(http_connection).to receive(:start).and_return(http_connection)
3233
end
3334
end
3435

@@ -186,6 +187,29 @@
186187

187188
it { is_expected.to eq('http://local.test:345?timeout=7') }
188189
end
190+
191+
describe '#close' do
192+
include_context 'HTTP connection stub'
193+
194+
before do
195+
allow(http_connection).to receive(:started?).and_return(true)
196+
197+
adapter.open {}
198+
end
199+
200+
it 'closes connection' do
201+
expect(http_connection).to receive(:finish).once
202+
203+
adapter.close
204+
end
205+
206+
it 'close can be called multiple times' do
207+
expect(http_connection).to receive(:finish).once
208+
209+
adapter.close
210+
adapter.close
211+
end
212+
end
189213
end
190214

191215
RSpec.describe Datadog::Transport::HTTP::Adapters::Net::Response do

spec/ddtrace/transport/traces_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@
132132
].with_fallbacks(v2: :v1)
133133
end
134134

135-
let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1) }
136-
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2) }
135+
let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1, adapter: nil) }
136+
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2, adapter: nil) }
137137
let(:encoder_v1) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/plain') }
138138
let(:encoder_v2) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/csv') }
139139
end
@@ -162,8 +162,8 @@
162162
let(:lazy_chunks) { chunks.lazy }
163163

164164
let(:request) { instance_double(Datadog::Transport::Traces::Request) }
165-
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client) }
166-
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client) }
165+
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }
166+
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }
167167

168168
let(:chunker) { instance_double(Datadog::Transport::Traces::Chunker, max_size: 1) }
169169

0 commit comments

Comments
 (0)