Skip to content

Commit 1a610f7

Browse files
ryploEmanuele Palazzetti
authored andcommitted
[core] ensure traces are sent before process ends (#195)
* Adds shutdown function to sleep, check for non empty buffers, repeat * Adds check to make sure shutdown only called once * Moves shutdown method and flag to the worker * Allows trace buffer to be closed. Makes sure it is closed before worker is shut down * Adds comments to the tracer and worker shutdown methods * Shutdown method tells worker thread to stop, and waits for it to finish before exiting * Fixes check for shutdown method so that traces send when buffer is cleared but the traces are still being sent. Updates test to use a thread to make sure shutdown is not called more than once
1 parent 3d45158 commit 1a610f7

File tree

6 files changed

+143
-1
lines changed

6 files changed

+143
-1
lines changed

lib/ddtrace/buffer.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ def initialize(max_size)
1010

1111
@mutex = Mutex.new()
1212
@traces = []
13+
@closed = false
1314
end
1415

1516
# Add a new ``trace`` in the local queue. This method doesn't block the execution
1617
# even if the buffer is full. In that case, a random trace is discarded.
1718
def push(trace)
19+
return if @closed
1820
@mutex.synchronize do
1921
len = @traces.length
2022
if len < @max_size || @max_size <= 0
@@ -48,5 +50,17 @@ def pop
4850
return traces
4951
end
5052
end
53+
54+
def close
55+
@mutex.synchronize do
56+
@closed = true
57+
end
58+
end
59+
60+
def closed?
61+
@mutex.synchronise do
62+
return @closed
63+
end
64+
end
5165
end
5266
end

lib/ddtrace/tracer.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,23 @@ def self.debug_logging
5757
log.level == Logger::DEBUG
5858
end
5959

60+
# Shorthand that calls the `shutdown!` method of a registered worker.
61+
# It's useful to ensure that the Trace Buffer is properly flushed before
62+
# shutting down the application.
63+
#
64+
# For instance:
65+
#
66+
# tracer.trace('operation_name', service='rake_tasks') do |span|
67+
# span.set_tag('task.name', 'script')
68+
# end
69+
#
70+
# tracer.shutdown!
71+
#
72+
def shutdown!
73+
return if !@enabled || @writer.worker.nil?
74+
@writer.worker.shutdown!
75+
end
76+
6077
# Return the current active \Context for this traced execution. This method is
6178
# automatically called when calling Tracer.trace or Tracer.start_span,
6279
# but it can be used in the application code during manual instrumentation.

lib/ddtrace/workers.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,18 @@ module Workers
99
# will perform a task at regular intervals. The thread can be stopped
1010
# with the +stop()+ method and can start with the +start()+ method.
1111
class AsyncTransport
12+
DEFAULT_TIMEOUT = 5
13+
14+
attr_reader :trace_buffer, :service_buffer, :shutting_down
15+
1216
def initialize(transport, buff_size, trace_task, service_task, interval)
1317
@trace_task = trace_task
1418
@service_task = service_task
1519
@flush_interval = interval
1620
@trace_buffer = TraceBuffer.new(buff_size)
1721
@service_buffer = TraceBuffer.new(buff_size)
1822
@transport = transport
23+
@shutting_down = false
1924

2025
@worker = nil
2126
@run = false
@@ -71,6 +76,24 @@ def stop
7176
@run = false
7277
end
7378

79+
# Closes all available queues and waits for the trace and service buffer to flush
80+
def shutdown!
81+
return false if @shutting_down
82+
@shutting_down = true
83+
@trace_buffer.close
84+
@service_buffer.close
85+
sleep(0.1)
86+
timeout_time = Time.now + DEFAULT_TIMEOUT
87+
while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time
88+
sleep(0.05)
89+
Datadog::Tracer.log.debug('Waiting for the buffers to clear before exiting')
90+
end
91+
stop
92+
join
93+
@shutting_down = false
94+
true
95+
end
96+
7497
# Block until executor shutdown is complete or until timeout seconds have passed.
7598
def join
7699
@worker.join(5)

lib/ddtrace/writer.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
module Datadog
66
# Traces and services writer that periodically sends data to the trace-agent
77
class Writer
8-
attr_reader :transport
8+
attr_reader :transport, :worker
99

1010
HOSTNAME = 'localhost'.freeze
1111
PORT = '8126'.freeze

test/buffer_test.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,20 @@ def test_trace_buffer_pop
6969
assert_includes(output_traces, input_traces[0])
7070
assert_includes(output_traces, input_traces[1])
7171
end
72+
73+
def test_closed_trace_buffer
74+
# the trace buffer should not accept anymore traces when closed
75+
buffer = Datadog::TraceBuffer.new(4)
76+
buffer.push(1)
77+
buffer.push(2)
78+
buffer.push(3)
79+
buffer.push(4)
80+
buffer.close
81+
buffer.push(5)
82+
buffer.push(6)
83+
out = buffer.pop
84+
assert_equal(out.length, 4)
85+
assert(!out.include?(5))
86+
assert(!out.include?(6))
87+
end
7288
end

test/integration_test.rb

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,58 @@ def agent_receives_span_step3(tracer, previous_success)
6666
assert_equal(0, stats[:transport][:internal_error])
6767
end
6868

69+
def agent_receives_short_span(tracer)
70+
tracer.set_service_info('my.service', 'rails', 'web')
71+
span = tracer.start_span('my.short.op')
72+
span.service = 'my.service'
73+
span.finish()
74+
75+
first_shutdown = tracer.shutdown!
76+
77+
stats = tracer.writer.stats
78+
assert(first_shutdown, 'should have run through the shutdown method')
79+
assert(span.finished?, 'span did not finish')
80+
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
81+
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
82+
assert_equal(0, stats[:transport][:client_error])
83+
assert_equal(0, stats[:transport][:server_error])
84+
assert_equal(0, stats[:transport][:internal_error])
85+
end
86+
87+
def shutdown_exec_only_once(tracer)
88+
tracer.set_service_info('my.service', 'rails', 'web')
89+
span = tracer.start_span('my.short.op')
90+
span.service = 'my.service'
91+
span.finish()
92+
first_shutdown = nil
93+
second_shutdown = nil
94+
worker = Thread.new() do
95+
first_shutdown = tracer.shutdown!
96+
end
97+
98+
100.times do
99+
break if tracer.writer.worker.shutting_down # wait until first shutdown is halfway through execution
100+
sleep(0.01)
101+
end
102+
103+
second_worker = Thread.new() do
104+
second_shutdown = tracer.shutdown!
105+
end
106+
107+
worker.join(2)
108+
second_worker.join(2)
109+
110+
stats = tracer.writer.stats
111+
assert(first_shutdown, 'should have run through the shutdown method')
112+
assert_equal(false, second_shutdown, 'should not have executed shutdown')
113+
assert_equal(true, span.finished?, 'span did not finish')
114+
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
115+
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
116+
assert_equal(0, stats[:transport][:client_error])
117+
assert_equal(0, stats[:transport][:server_error])
118+
assert_equal(0, stats[:transport][:internal_error])
119+
end
120+
69121
def test_agent_receives_span
70122
# test that the agent really receives the spans
71123
# this test can be long since it waits internal buffers flush
@@ -78,4 +130,24 @@ def test_agent_receives_span
78130
success = agent_receives_span_step2(tracer)
79131
agent_receives_span_step3(tracer, success)
80132
end
133+
134+
def test_short_span
135+
skip unless ENV['TEST_DATADOG_INTEGRATION'] || RUBY_PLATFORM != 'java'
136+
# requires a running agent, and test does not apply to Java threading model
137+
138+
tracer = Datadog::Tracer.new
139+
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')
140+
141+
agent_receives_short_span(tracer)
142+
end
143+
144+
def test_shutdown_exec_once
145+
skip unless ENV['TEST_DATADOG_INTEGRATION'] || RUBY_PLATFORM != 'java'
146+
# requires a running agent, and test does not apply to Java threading model
147+
148+
tracer = Datadog::Tracer.new
149+
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')
150+
151+
shutdown_exec_only_once(tracer)
152+
end
81153
end

0 commit comments

Comments
 (0)