Skip to content

Commit 919b126

Browse files
committed
Reuse HTTP connection between trace flushes
1 parent 60f71fc commit 919b126

File tree

12 files changed

+126
-38
lines changed

12 files changed

+126
-38
lines changed

lib/datadog/tracing/writer.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ def stop
9595
def stop_worker
9696
@stopped = true
9797

98+
@transport.stop
99+
98100
return if @worker.nil?
99101

100102
@worker.stop

lib/ddtrace/transport/http/adapters/net.rb

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@ module Datadog
77
module Transport
88
module HTTP
99
module Adapters
10-
# Adapter for Net::HTTP
10+
# Adapter for Net::HTTP.
11+
# This adapter reuses a long-running HTTP connection with the target URL
12+
# when possible.
13+
# It is required to invoke {#close} to ensure the connection is closed.
1114
class Net
1215
attr_reader \
1316
:hostname,
1417
:port,
1518
:timeout,
1619
:ssl
1720

18-
# in seconds
21+
# In seconds.
1922
DEFAULT_TIMEOUT = 30
2023

24+
# If this many seconds have passed since the last request, create a new TCP connection.
25+
# In seconds.
26+
KEEP_ALIVE_TIMEOUT = 60
27+
2128
# @deprecated Positional parameters are deprecated. Use named parameters instead.
2229
def initialize(hostname = nil, port = nil, **options)
2330
@hostname = hostname || options.fetch(:hostname)
@@ -39,12 +46,17 @@ def open(&block)
3946
# DEV Initializing +Net::HTTP+ directly help us avoid expensive
4047
# options processing done in +Net::HTTP.start+:
4148
# https://github.com/ruby/ruby/blob/b2d96abb42abbe2e01f010ffc9ac51f0f9a50002/lib/net/http.rb#L614-L618
42-
req = ::Net::HTTP.new(hostname, port, nil)
49+
@req ||= begin
50+
req = ::Net::HTTP.new(hostname, port, nil)
51+
52+
req.use_ssl = ssl
53+
req.open_timeout = req.read_timeout = timeout
54+
req.keep_alive_timeout = KEEP_ALIVE_TIMEOUT
4355

44-
req.use_ssl = ssl
45-
req.open_timeout = req.read_timeout = timeout
56+
req.start # Calling #start without a block creates a long-lived connection
57+
end
4658

47-
req.start(&block)
59+
yield @req
4860
end
4961

5062
def call(env)
@@ -82,6 +94,14 @@ def url
8294
"http://#{hostname}:#{port}?timeout=#{timeout}"
8395
end
8496

97+
def close
98+
# Clean up HTTP socket
99+
if @req && @req.started?
100+
@req.finish
101+
@req = nil
102+
end
103+
end
104+
85105
# Raised when called with an unknown HTTP method
86106
class UnknownHTTPMethod < StandardError
87107
attr_reader :verb

lib/ddtrace/transport/http/client.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ def send_request(request, &block)
4848
def build_env(request)
4949
Env.new(request)
5050
end
51+
52+
def close
53+
@api.adapter.close if @api.adapter.respond_to?(:close)
54+
end
5155
end
5256
end
5357
end

lib/ddtrace/transport/io/traces.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ def send_traces(traces)
4343
Traces::Response.new(result)
4444
end]
4545
end
46+
47+
def stop
48+
# There are no special resources to clean up
49+
end
4650
end
4751

4852
# Encoder for IO-specific trace encoding

lib/ddtrace/transport/traces.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ class Transport
119119
def initialize(apis, default_api)
120120
@apis = apis
121121
@default_api = default_api
122+
@client = nil
122123

123124
change_api!(default_api)
124125
end
@@ -165,6 +166,10 @@ def current_api
165166
apis[@current_api_id]
166167
end
167168

169+
def stop
170+
@client.close if @client
171+
end
172+
168173
private
169174

170175
def downgrade?(response)
@@ -183,6 +188,8 @@ def downgrade!
183188
def change_api!(api_id)
184189
raise UnknownApiVersionError, api_id unless apis.key?(api_id)
185190

191+
@client.close if @client
192+
186193
@current_api_id = api_id
187194
@client = HTTP::Client.new(current_api)
188195
end

spec/datadog/integration_spec.rb

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
require 'datadog/opentracer'
55
require 'datadog/statsd'
66

7+
require_relative '../support/system_helper'
8+
79
RSpec.describe 'Datadog integration' do
810
context 'graceful shutdown', :integration do
911
before do
@@ -42,27 +44,15 @@ def thread_count
4244
end
4345

4446
context 'for file descriptors' do
45-
def open_file_descriptors
46-
# Unix-specific way to get the current process' open file descriptors and the files (if any) they correspond to
47-
Dir['/dev/fd/*'].each_with_object({}) do |fd, hash|
48-
hash[fd] =
49-
begin
50-
File.realpath(fd)
51-
rescue SystemCallError # This can fail due to... reasons, and we only want it for debugging so let's ignore
52-
nil
53-
end
54-
end
55-
end
56-
5747
it 'closes tracer file descriptors (known flaky test)' do
58-
before_open_file_descriptors = open_file_descriptors
48+
before_open_file_descriptors = SystemHelper.open_fds
5949

6050
start_tracer
6151
wait_for_tracer_sent
6252

6353
shutdown
6454

65-
after_open_file_descriptors = open_file_descriptors
55+
after_open_file_descriptors = SystemHelper.open_fds
6656

6757
expect(after_open_file_descriptors.size)
6858
.to(

spec/datadog/tracing/integration_spec.rb

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# typed: ignore
22

33
require 'spec_helper'
4+
require_relative '../../support/system_helper'
45

56
require 'datadog/statsd'
67

@@ -45,8 +46,21 @@
4546
it { expect(stats).to include(traces_flushed: 0) }
4647
end
4748

49+
before do
50+
WebMock.disable!
51+
Datadog.configuration.reset!
52+
end
53+
4854
after { tracer.shutdown! }
4955

56+
def wait_for_flush(stat, num = 1)
57+
test_repeat.times do
58+
break if tracer.writer.stats[stat] >= num
59+
60+
sleep(0.1)
61+
end
62+
end
63+
5064
describe 'agent receives span' do
5165
include_context 'agent-based test'
5266

@@ -57,14 +71,6 @@ def create_trace
5771
end
5872
end
5973

60-
def wait_for_flush(stat, num = 1)
61-
test_repeat.times do
62-
break if tracer.writer.stats[stat] >= num
63-
64-
sleep(0.1)
65-
end
66-
end
67-
6874
def agent_receives_span_step1
6975
stats = tracer.writer.stats
7076
expect(stats[:traces_flushed]).to eq(0)
@@ -143,24 +149,32 @@ def agent_receives_span_step3(previous_success)
143149
describe 'agent receives short span' do
144150
include_context 'agent-based test'
145151

146-
before do
152+
def trace
147153
tracer.trace('my.short.op') do |span|
148154
@span = span
149155
span.service = 'my.service'
150156
end
151157

152-
@first_shutdown = tracer.shutdown!
158+
wait_for_flush(:traces_flushed)
153159
end
154160

155161
let(:stats) { tracer.writer.stats }
156162

157163
it do
158-
expect(@first_shutdown).to be true
164+
trace
165+
159166
expect(@span.finished?).to be true
160167
expect(stats[:services_flushed]).to be_nil
161168
end
162169

163-
it_behaves_like 'flushed trace'
170+
it 'reuses the same HTTP connection' do
171+
trace
172+
expect { trace }.to_not(change { SystemHelper.open_fds })
173+
end
174+
175+
it_behaves_like 'flushed trace' do
176+
before { trace }
177+
end
164178
end
165179

166180
describe 'rule sampler' do
@@ -319,6 +333,8 @@ def agent_receives_span_step3(previous_success)
319333
function.call(traces)
320334
end
321335

336+
WebMock.enable!
337+
322338
trace # Run test subject
323339
tracer.shutdown! # Ensure trace is flushed, so we can read writer statistics
324340
end
@@ -347,6 +363,7 @@ def agent_receives_span_step3(previous_success)
347363
end
348364

349365
after do
366+
WebMock.disable!
350367
Datadog.configuration.tracing.sampling.reset!
351368
end
352369

@@ -468,6 +485,8 @@ def sample!(trace)
468485
span.service = 'my.service'
469486
end
470487

488+
wait_for_flush(:traces_flushed)
489+
471490
threads = Array.new(10) do
472491
Thread.new { tracer.shutdown! }
473492
end

spec/datadog/tracing/writer_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
subject(:writer) { described_class.new(options) }
2323

2424
let(:options) { { transport: transport } }
25-
let(:transport) { instance_double(Datadog::Transport::Traces::Transport) }
25+
let(:transport) { instance_double(Datadog::Transport::Traces::Transport, stop: nil) }
2626

2727
describe 'behavior' do
2828
describe '#initialize' do

spec/ddtrace/transport/http/adapters/net_spec.rb

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626

2727
allow(http_connection).to receive(:open_timeout=).with(adapter.timeout)
2828
allow(http_connection).to receive(:read_timeout=).with(adapter.timeout)
29+
allow(http_connection).to receive(:keep_alive_timeout=).with(60)
2930
allow(http_connection).to receive(:use_ssl=).with(adapter.ssl)
3031

31-
allow(http_connection).to receive(:start).and_yield(http_connection)
32+
allow(http_connection).to receive(:start).and_return(http_connection)
3233
end
3334
end
3435

@@ -186,6 +187,29 @@
186187

187188
it { is_expected.to eq('http://local.test:345?timeout=7') }
188189
end
190+
191+
describe '#close' do
192+
include_context 'HTTP connection stub'
193+
194+
before do
195+
allow(http_connection).to receive(:started?).and_return(true)
196+
197+
adapter.open {}
198+
end
199+
200+
it 'closes connection' do
201+
expect(http_connection).to receive(:finish).once
202+
203+
adapter.close
204+
end
205+
206+
it 'close can be called multiple times' do
207+
expect(http_connection).to receive(:finish).once
208+
209+
adapter.close
210+
adapter.close
211+
end
212+
end
189213
end
190214

191215
RSpec.describe Datadog::Transport::HTTP::Adapters::Net::Response do

spec/ddtrace/transport/traces_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@
132132
].with_fallbacks(v2: :v1)
133133
end
134134

135-
let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1) }
136-
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2) }
135+
let(:api_v1) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v1', encoder: encoder_v1, adapter: nil) }
136+
let(:api_v2) { instance_double(Datadog::Transport::HTTP::API::Instance, 'v2', encoder: encoder_v2, adapter: nil) }
137137
let(:encoder_v1) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/plain') }
138138
let(:encoder_v2) { instance_double(Datadog::Core::Encoding::Encoder, content_type: 'text/csv') }
139139
end
@@ -162,8 +162,8 @@
162162
let(:lazy_chunks) { chunks.lazy }
163163

164164
let(:request) { instance_double(Datadog::Transport::Traces::Request) }
165-
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client) }
166-
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client) }
165+
let(:client_v2) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }
166+
let(:client_v1) { instance_double(Datadog::Transport::HTTP::Client, close: nil) }
167167

168168
let(:chunker) { instance_double(Datadog::Transport::Traces::Chunker, max_size: 1) }
169169

0 commit comments

Comments
 (0)