Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions benchmarks/tracing_http_transport.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# frozen_string_literal: true
# typed: false

# Used to quickly run benchmark under RSpec as part of the usual test suite, to validate it didn't bitrot
VALIDATE_BENCHMARK_MODE = ENV['VALIDATE_BENCHMARK'] == 'true'

return unless __FILE__ == $PROGRAM_NAME || VALIDATE_BENCHMARK_MODE

require 'benchmark/ips'
require 'ddtrace'
require 'webrick'
require_relative 'dogstatsd_reporter'

class TracingHttpTransportBenchmark
def initialize
@port = Datadog::Transport::HTTP::DO_NOT_USE_ENVIRONMENT_AGENT_SETTINGS.port
@transport = Datadog::Transport::HTTP.default
@spans = test_traces(50)
end

def start_fake_webserver
ready_queue = Queue.new

require 'webrick'

server = WEBrick::HTTPServer.new(
Port: @port,
StartCallback: -> { ready_queue.push(1) }
)
server_proc = proc do |req, res|
res.body = '{}'
end

server.mount_proc('/', &server_proc)
Thread.new { server.start }
ready_queue.pop
end

# Return some test traces
def test_traces(n, service: 'test-app', resource: '/traces', type: 'web')
traces = []

n.times do
trace_op = Datadog::Tracing::TraceOperation.new

trace_op.measure('client.testing', service: service, resource: resource, type: type) do
trace_op.measure('client.testing', service: service, resource: resource, type: type) do
end
end

traces << trace_op.flush!
end

traces
end

def run_benchmark
Benchmark.ips do |x|
benchmark_time = VALIDATE_BENCHMARK_MODE ? { time: 0.01, warmup: 0 } : { time: 70, warmup: 2 }
x.config(**benchmark_time, suite: report_to_dogstatsd_if_enabled_via_environment_variable(benchmark_name: 'tracing_http_transport'))

x.report("http_transport #{ENV['CONFIG']}") do
run_once
end

x.save! 'tmp/tracing-http-transport-results.json' unless VALIDATE_BENCHMARK_MODE
x.compare!
end
end

def run_forever
while true
100.times { run_once }
print '.'
end
end

def run_once
success = @transport.send_traces(@spans)

raise('Unexpected: Export failed') unless success
end
end

puts "Current pid is #{Process.pid}"

TracingHttpTransportBenchmark.new.instance_exec do
if ARGV.include?('--forever')
run_forever
else
run_benchmark
end
end
2 changes: 2 additions & 0 deletions lib/datadog/tracing/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def stop
def stop_worker
@stopped = true

@transport.stop

return if @worker.nil?

@worker.stop
Expand Down
39 changes: 30 additions & 9 deletions lib/ddtrace/transport/http/adapters/net.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,27 @@ module Datadog
module Transport
module HTTP
module Adapters
# Adapter for Net::HTTP
# Adapter for Net::HTTP.
# This adapter reuses a long-running HTTP connection with the target URL
# when possible.
# It is required to invoke {#close} to ensure the connection is closed.
class Net
attr_reader \
:hostname,
:port,
:timeout,
:ssl

# in seconds
# In seconds.
DEFAULT_TIMEOUT = 30

# If this many seconds have passed since the last request, create a new TCP connection.
# This should match the trace agent timeout, as the agent will close its side of the connect after this
# much time has passed:
# https://github.com/DataDog/datadog-agent/blob/ef47fb8c411938e672eb29f3cc5ad79b00133f02/pkg/trace/api/api.go#L150
# We subtract 1 second from the agent timeout, to avoid being just slightly too late.
KEEP_ALIVE_TIMEOUT = 4 # In seconds.

# @deprecated Positional parameters are deprecated. Use named parameters instead.
def initialize(hostname = nil, port = nil, **options)
@hostname = hostname || options.fetch(:hostname)
Expand All @@ -39,12 +49,17 @@ def open(&block)
# DEV Initializing +Net::HTTP+ directly help us avoid expensive
# options processing done in +Net::HTTP.start+:
# https://github.com/ruby/ruby/blob/b2d96abb42abbe2e01f010ffc9ac51f0f9a50002/lib/net/http.rb#L614-L618
req = ::Net::HTTP.new(hostname, port, nil)
@req ||= begin
req = ::Net::HTTP.new(hostname, port, nil)

req.use_ssl = ssl
req.open_timeout = req.read_timeout = timeout
req.keep_alive_timeout = KEEP_ALIVE_TIMEOUT

req.use_ssl = ssl
req.open_timeout = req.read_timeout = timeout
req.start # Calling #start without a block creates a long-lived connection
end

req.start(&block)
yield @req
end

def call(env)
Expand All @@ -56,13 +71,11 @@ def call(env)
end

def post(env)
post = nil

if env.form.nil? || env.form.empty?
post = ::Net::HTTP::Post.new(env.path, env.headers)
post.body = env.body
else
post = ::Datadog::Core::Vendor::Net::HTTP::Post::Multipart.new(
post = Core::Vendor::Net::HTTP::Post::Multipart.new(
env.path,
env.form,
env.headers
Expand All @@ -82,6 +95,14 @@ def url
"http://#{hostname}:#{port}?timeout=#{timeout}"
end

def close
# Clean up HTTP socket
if @req && @req.started?
@req.finish
@req = nil
end
end

# Raised when called with an unknown HTTP method
class UnknownHTTPMethod < StandardError
attr_reader :verb
Expand Down
4 changes: 4 additions & 0 deletions lib/ddtrace/transport/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ def send_request(request, &block)
def build_env(request)
Env.new(request)
end

def close
@api.adapter.close if @api.adapter.respond_to?(:close)
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/ddtrace/transport/io/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def send_traces(traces)
Traces::Response.new(result)
end]
end

def stop
# There are no special resources to clean up
end
end

# Encoder for IO-specific trace encoding
Expand Down
9 changes: 9 additions & 0 deletions lib/ddtrace/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,15 @@ def encode_trace(encoder, trace)
# This class initializes the HTTP client, breaks down large
# batches of traces into smaller chunks and handles
# API version downgrade handshake.
#
# DEV: We should add retry logic to the transport, as transient failures are common in a real network environment
class Transport
attr_reader :client, :apis, :default_api, :current_api_id

def initialize(apis, default_api)
@apis = apis
@default_api = default_api
@client = nil

change_api!(default_api)
end
Expand Down Expand Up @@ -165,6 +168,10 @@ def current_api
apis[@current_api_id]
end

def stop
@client.close if @client
end

private

def downgrade?(response)
Expand All @@ -183,6 +190,8 @@ def downgrade!
def change_api!(api_id)
raise UnknownApiVersionError, api_id unless apis.key?(api_id)

@client.close if @client

@current_api_id = api_id
@client = HTTP::Client.new(current_api)
end
Expand Down
18 changes: 4 additions & 14 deletions spec/datadog/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
require 'datadog/opentracer'
require 'datadog/statsd'

require_relative '../support/system_helper'

RSpec.describe 'Datadog integration' do
context 'graceful shutdown', :integration do
before do
Expand Down Expand Up @@ -42,27 +44,15 @@ def thread_count
end

context 'for file descriptors' do
def open_file_descriptors
# Unix-specific way to get the current process' open file descriptors and the files (if any) they correspond to
Dir['/dev/fd/*'].each_with_object({}) do |fd, hash|
hash[fd] =
begin
File.realpath(fd)
rescue SystemCallError # This can fail due to... reasons, and we only want it for debugging so let's ignore
nil
end
end
end
Comment on lines -45 to -55
Copy link
Member Author

@marcotc marcotc Oct 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a shared spec helper, now that checking for leaked FDs is needed in multiple places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided not to use this approach for HTTP testing, but I feel like this refactor is still good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should move this out of this PR.


it 'closes tracer file descriptors (known flaky test)' do
before_open_file_descriptors = open_file_descriptors
before_open_file_descriptors = SystemHelper.open_fds

start_tracer
wait_for_tracer_sent

shutdown

after_open_file_descriptors = open_file_descriptors
after_open_file_descriptors = SystemHelper.open_fds

expect(after_open_file_descriptors.size)
.to(
Expand Down
Loading