Skip to content

Ensure traces sent before process ends #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Oct 2, 2017
14 changes: 14 additions & 0 deletions lib/ddtrace/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ def initialize(max_size)

@mutex = Mutex.new()
@traces = []
@closed = false
end

# Add a new ``trace`` in the local queue. This method doesn't block the execution
# even if the buffer is full. In that case, a random trace is discarded.
def push(trace)
return if @closed
@mutex.synchronize do
len = @traces.length
if len < @max_size || @max_size <= 0
Expand Down Expand Up @@ -48,5 +50,17 @@ def pop
return traces
end
end

def close
@mutex.synchronize do
@closed = true
end
end

def closed?
@mutex.synchronise do
return @closed
end
end
end
end
17 changes: 17 additions & 0 deletions lib/ddtrace/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ def self.debug_logging
log.level == Logger::DEBUG
end

# Shorthand that calls the `shutdown!` method of a registered worker.
# It's useful to ensure that the Trace Buffer is properly flushed before
# shutting down the application.
#
# For instance:
#
# tracer.trace('operation_name', service='rake_tasks') do |span|
# span.set_tag('task.name', 'script')
# end
#
# tracer.shutdown!
#
def shutdown!
return if !@enabled || @writer.worker.nil?
@writer.worker.shutdown!
end

# Return the current active \Context for this traced execution. This method is
# automatically called when calling Tracer.trace or Tracer.start_span,
# but it can be used in the application code during manual instrumentation.
Expand Down
23 changes: 23 additions & 0 deletions lib/ddtrace/workers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ module Workers
# will perform a task at regular intervals. The thread can be stopped
# with the +stop()+ method and can start with the +start()+ method.
class AsyncTransport
DEFAULT_TIMEOUT = 5

attr_reader :trace_buffer, :service_buffer, :shutting_down

def initialize(transport, buff_size, trace_task, service_task, interval)
@trace_task = trace_task
@service_task = service_task
@flush_interval = interval
@trace_buffer = TraceBuffer.new(buff_size)
@service_buffer = TraceBuffer.new(buff_size)
@transport = transport
@shutting_down = false

@worker = nil
@run = false
Expand Down Expand Up @@ -71,6 +76,24 @@ def stop
@run = false
end

# Closes all available queues and waits for the trace and service buffer to flush
def shutdown!
return false if @shutting_down
@shutting_down = true
@trace_buffer.close
@service_buffer.close
sleep(0.1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need any sleep at all? if we close both buffers and we stop the worker, technically we should just join no? looking here:

while @run
callback_traces
callback_services
sleep(@flush_interval) if @run
end

both callback_traces (and callback_services):

  • empties the buffer
  • sends the buffers

So if we join we simply wait that the flushing thread exits the while, so for sure buffers are empty AND sent. The join has a timeout of 5 seconds, that is x5 times our flushing interval so it's reasonable high to ensure that traces are flushed.

Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryplo since this approach works for Resque, we can merge the PR and make a new one to improve that code block.

timeout_time = Time.now + DEFAULT_TIMEOUT
while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time
sleep(0.05)
Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting')
end
stop
join
@shutting_down = false
true
end

# Block until executor shutdown is complete or until timeout seconds have passed.
def join
@worker.join(5)
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module Datadog
# Traces and services writer that periodically sends data to the trace-agent
class Writer
attr_reader :transport
attr_reader :transport, :worker

HOSTNAME = 'localhost'.freeze
PORT = '8126'.freeze
Expand Down
16 changes: 16 additions & 0 deletions test/buffer_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,20 @@ def test_trace_buffer_pop
assert_includes(output_traces, input_traces[0])
assert_includes(output_traces, input_traces[1])
end

def test_closed_trace_buffer
# the trace buffer should not accept anymore traces when closed
buffer = Datadog::TraceBuffer.new(4)
buffer.push(1)
buffer.push(2)
buffer.push(3)
buffer.push(4)
buffer.close
buffer.push(5)
buffer.push(6)
out = buffer.pop
assert_equal(out.length, 4)
assert(!out.include?(5))
assert(!out.include?(6))
end
end
72 changes: 72 additions & 0 deletions test/integration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,58 @@ def agent_receives_span_step3(tracer, previous_success)
assert_equal(0, stats[:transport][:internal_error])
end

def agent_receives_short_span(tracer)
tracer.set_service_info('my.service', 'rails', 'web')
span = tracer.start_span('my.short.op')
span.service = 'my.service'
span.finish()

first_shutdown = tracer.shutdown!

stats = tracer.writer.stats
assert(first_shutdown, 'should have run through the shutdown method')
assert(span.finished?, 'span did not finish')
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
assert_equal(0, stats[:transport][:client_error])
assert_equal(0, stats[:transport][:server_error])
assert_equal(0, stats[:transport][:internal_error])
end

def shutdown_exec_only_once(tracer)
tracer.set_service_info('my.service', 'rails', 'web')
span = tracer.start_span('my.short.op')
span.service = 'my.service'
span.finish()
first_shutdown = nil
second_shutdown = nil
worker = Thread.new() do
first_shutdown = tracer.shutdown!
end

100.times do
break if tracer.writer.worker.shutting_down # wait until first shutdown is halfway through execution
sleep(0.01)
end

second_worker = Thread.new() do
second_shutdown = tracer.shutdown!
end

worker.join(2)
second_worker.join(2)

stats = tracer.writer.stats
assert(first_shutdown, 'should have run through the shutdown method')
assert_equal(false, second_shutdown, 'should not have executed shutdown')
assert_equal(true, span.finished?, 'span did not finish')
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
assert_equal(0, stats[:transport][:client_error])
assert_equal(0, stats[:transport][:server_error])
assert_equal(0, stats[:transport][:internal_error])
end

def test_agent_receives_span
# test that the agent really receives the spans
# this test can be long since it waits internal buffers flush
Expand All @@ -78,4 +130,24 @@ def test_agent_receives_span
success = agent_receives_span_step2(tracer)
agent_receives_span_step3(tracer, success)
end

def test_short_span
skip unless ENV['TEST_DATADOG_INTEGRATION'] || RUBY_PLATFORM != 'java'
# requires a running agent, and test does not apply to Java threading model

tracer = Datadog::Tracer.new
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')

agent_receives_short_span(tracer)
end

def test_shutdown_exec_once
skip unless ENV['TEST_DATADOG_INTEGRATION'] || RUBY_PLATFORM != 'java'
# requires a running agent, and test does not apply to Java threading model

tracer = Datadog::Tracer.new
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')

shutdown_exec_only_once(tracer)
end
end