Skip to content

Commit 045040d

Browse files
author
Rachel Lo
committed
Allows trace buffer to be closed. Makes sure it is closed before worker is shut down. Adds tests for these changes
1 parent dfd679a commit 045040d

File tree

5 files changed

+98
-5
lines changed

5 files changed

+98
-5
lines changed

lib/ddtrace/buffer.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ def initialize(max_size)
1010

1111
@mutex = Mutex.new()
1212
@traces = []
13+
@closed = false
1314
end
1415

1516
# Add a new ``trace`` in the local queue. This method doesn't block the execution
1617
# even if the buffer is full. In that case, a random trace is discarded.
1718
def push(trace)
19+
return if @closed
1820
@mutex.synchronize do
1921
len = @traces.length
2022
if len < @max_size || @max_size <= 0
@@ -48,5 +50,17 @@ def pop
4850
return traces
4951
end
5052
end
53+
54+
def close
55+
@mutex.synchronize do
56+
@closed = true
57+
end
58+
end
59+
60+
def closed?
61+
@mutex.synchronise do
62+
return @closed
63+
end
64+
end
5165
end
5266
end

lib/ddtrace/tracer.rb

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ module Datadog
1818
# of these function calls and sub-requests would be encapsulated within a single trace.
1919
# rubocop:disable Metrics/ClassLength
2020
class Tracer
21-
DEFAULT_TIMEOUT = 5
22-
2321
attr_reader :writer, :sampler, :services, :tags, :provider
2422
attr_accessor :enabled
2523
attr_writer :default_service
@@ -59,8 +57,8 @@ def self.debug_logging
5957
log.level == Logger::DEBUG
6058
end
6159

62-
def self.shutdown!
63-
return if !@enabled? || @writer.worker.nil?
60+
def shutdown!
61+
return if !@enabled || @writer.worker.nil?
6462
@writer.worker.shutdown!
6563
end
6664

lib/ddtrace/workers.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ module Workers
99
# will perform a task at regular intervals. The thread can be stopped
1010
# with the +stop()+ method and can start with the +start()+ method.
1111
class AsyncTransport
12+
DEFAULT_TIMEOUT = 5
13+
1214
attr_reader :trace_buffer, :service_buffer, :shutting_down
1315

1416
def initialize(span_interval, service_interval, transport, buff_size, trace_task, service_task)
@@ -91,8 +93,10 @@ def stop
9193
end
9294

9395
def shutdown!
94-
return if @shutting_down
96+
return if @shutting_down || (@trace_buffer.empty? && @service_buffer.empty?)
9597
@shutting_down = true
98+
@trace_buffer.close
99+
@service_buffer.close
96100
sleep(0.1)
97101
timeout_time = Time.now + DEFAULT_TIMEOUT
98102
while (!@trace_buffer.empty? || !@service_buffer.empty?) && Time.now <= timeout_time

test/buffer_test.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,20 @@ def test_trace_buffer_pop
6969
assert_includes(output_traces, input_traces[0])
7070
assert_includes(output_traces, input_traces[1])
7171
end
72+
73+
def test_closed_trace_buffer
74+
# the trace buffer should not accept anymore traces when closed
75+
buffer = Datadog::TraceBuffer.new(4)
76+
buffer.push(1)
77+
buffer.push(2)
78+
buffer.push(3)
79+
buffer.push(4)
80+
buffer.close
81+
buffer.push(5)
82+
buffer.push(6)
83+
out = buffer.pop
84+
assert_equal(out.length, 4)
85+
assert(!out.include?(5))
86+
assert(!out.include?(6))
87+
end
7288
end

test/integration_test.rb

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,49 @@ def agent_receives_span_step3(tracer, previous_success)
6666
assert_equal(0, stats[:transport][:internal_error])
6767
end
6868

69+
def agent_receives_short_span(tracer)
70+
tracer.set_service_info('my.service', 'rails', 'web')
71+
span = tracer.start_span('my.short.op')
72+
span.service = 'my.service'
73+
span.finish()
74+
75+
tracer.shutdown!
76+
77+
stats = tracer.writer.stats
78+
assert(span.finished?, 'span did not finish')
79+
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
80+
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
81+
assert_equal(0, stats[:transport][:client_error])
82+
assert_equal(0, stats[:transport][:server_error])
83+
assert_equal(0, stats[:transport][:internal_error])
84+
end
85+
86+
def shutdown_exec_only_once(tracer)
87+
tracer.set_service_info('my.service', 'rails', 'web')
88+
span = tracer.start_span('my.short.op')
89+
span.service = 'my.service'
90+
span.finish()
91+
92+
first_point = Time.now.utc
93+
tracer.shutdown!
94+
second_point = Time.now.utc
95+
tracer.shutdown!
96+
third_point = Time.now.utc
97+
98+
first_shutdown = second_point - first_point
99+
second_shutdown = third_point - second_point
100+
101+
stats = tracer.writer.stats
102+
assert(first_shutdown >= 0.1, 'should have executed shutdown')
103+
assert_equal(true, second_shutdown < 0.1, 'should not have executed second shutdown')
104+
assert_equal(true, span.finished?, 'span did not finish')
105+
assert_equal(1, stats[:traces_flushed], 'wrong number of traces flushed')
106+
assert_equal(1, stats[:services_flushed], 'wrong number of services flushed')
107+
assert_equal(0, stats[:transport][:client_error])
108+
assert_equal(0, stats[:transport][:server_error])
109+
assert_equal(0, stats[:transport][:internal_error])
110+
end
111+
69112
def test_agent_receives_span
70113
# test that the agent really receives the spans
71114
# this test can be long since it waits internal buffers flush
@@ -78,4 +121,22 @@ def test_agent_receives_span
78121
success = agent_receives_span_step2(tracer)
79122
agent_receives_span_step3(tracer, success)
80123
end
124+
125+
def test_short_span
126+
skip unless ENV['TEST_DATADOG_INTEGRATION'] # requires a running agent
127+
128+
tracer = Datadog::Tracer.new
129+
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')
130+
131+
agent_receives_short_span(tracer)
132+
end
133+
134+
def test_shutdown_exec_once
135+
skip unless ENV['TEST_DATADOG_INTEGRATION'] # requires a running agent
136+
137+
tracer = Datadog::Tracer.new
138+
tracer.configure(enabled: true, hostname: '127.0.0.1', port: '8126')
139+
140+
shutdown_exec_only_once(tracer)
141+
end
81142
end

0 commit comments

Comments
 (0)