Skip to content

Commit

Permalink
Fixed: Thread not restoring clean state after applying Profiling::Ext…
Browse files Browse the repository at this point in the history
…::CThread
  • Loading branch information
delner committed Oct 15, 2020
1 parent 62b5c6a commit 13a81e8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 55 deletions.
2 changes: 2 additions & 0 deletions lib/ddtrace/profiling/ext/cpu.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def self.supported?
def self.apply!
return false unless supported?

# Applying CThread to Thread will ensure any new threads
# will provide a thread/clock ID for CPU timing.
require 'ddtrace/profiling/ext/cthread'
::Thread.send(:prepend, Profiling::Ext::CThread)
end
Expand Down
12 changes: 9 additions & 3 deletions lib/ddtrace/profiling/ext/cthread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ module CThread
attach_function :pthread_getcpuclockid, [:ulong, CClockId], :int

def self.prepended(base)
# Be sure to update the current thread too; as it wouldn't have been set.
::Thread.current.class.send(:prepend, CThread) unless base == ::Thread.current.class
::Thread.current.send(:update_native_ids)
# Threads that have already been created, will not have resolved
# a thread/clock ID. This is because these IDs can only be resolved
# from within the thread's execution context, which we do not control.
#
# We can mitigate this for the current thread via #update_native_ids,
# since we are currently running within its execution context. We cannot
# do this for any other threads that may have been created already.
# (This is why it's important that CThread is applied before anything else runs.)
base.current.send(:update_native_ids) if base.current.is_a?(CThread)
end

attr_reader \
Expand Down
13 changes: 9 additions & 4 deletions spec/ddtrace/profiling/collectors/stack_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
let(:buffer) { instance_double(Datadog::Profiling::Buffer) }
let(:string_table) { Datadog::Utils::StringTable.new }
let(:backtrace_location_cache) { Datadog::Utils::ObjectSet.new }
let(:correlation) { instance_double(Datadog::Correlation::Identifier, trace_id: 0, span_id: 0) }

before do
allow(recorder)
Expand All @@ -27,6 +28,12 @@
.to receive(:cache)
.with(:backtrace_locations)
.and_return(backtrace_location_cache)

if Datadog.respond_to?(:tracer)
allow(Datadog.tracer)
.to receive(:active_correlation)
.and_return(correlation)
end
end

describe '::new' do
Expand Down Expand Up @@ -201,11 +208,9 @@

describe '#collect_thread_event' do
subject(:collect_events) { collector.collect_thread_event(thread, wall_time_interval_ns) }
let(:thread) { instance_double(Thread) }
let(:thread) { double('Thread', backtrace_locations: backtrace) }
let(:wall_time_interval_ns) { double('wall time interval in nanoseconds') }

before { allow(thread).to receive(:backtrace_locations).and_return(backtrace) }

context 'when the backtrace is empty' do
let(:backtrace) { nil }
it { is_expected.to be nil }
Expand Down Expand Up @@ -314,7 +319,7 @@

describe '#get_cpu_time_interval!' do
subject(:get_cpu_time_interval!) { collector.get_cpu_time_interval!(thread) }
let(:thread) { instance_double(Thread) }
let(:thread) { double('Thread') }

context 'when CPU timing is not supported' do
it { is_expected.to be nil }
Expand Down
9 changes: 1 addition & 8 deletions spec/ddtrace/profiling/ext/cpu_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,7 @@
describe '::apply!' do
subject(:apply!) { described_class.apply! }

around do |example|
unmodified_thread_class = ::Thread.dup

example.run

Object.send(:remove_const, :Thread)
Object.const_set('Thread', unmodified_thread_class)
end
before { stub_const('Thread', ::Thread.dup) }

context 'when native CPU time is supported' do
before { skip 'CPU profiling not supported' unless described_class.supported? }
Expand Down
80 changes: 52 additions & 28 deletions spec/ddtrace/profiling/ext/cthread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,47 @@
let(:block) { proc { loop { sleep(1) } } }

let(:thread_class) do
Thread.send(:prepend, described_class)
Thread
end

# Leave Thread class in a clean state before and after tests
around do |example|
expect(::Thread.ancestors).to_not include(described_class)
unmodified_class = ::Thread.dup

example.run
klass = ::Thread.dup
klass.send(:prepend, described_class)
klass
end

Object.send(:remove_const, :Thread)
Object.const_set('Thread', unmodified_class)
# Leave Thread class in a clean state before and after tests
before do
stub_const('Thread', thread_class)
end

# Kill any spawned threads
after { thread.kill if instance_variable_defined?(:@thread_started) && @thread_started }

shared_context 'with main thread' do
let(:thread_class) { ::Thread }

def on_main_thread
# Patch thread in a fork so we don't modify the original Thread class
expect_in_fork do
thread_class.send(:prepend, described_class)
yield
end

# Ensure the patch didn't leak out of the fork.
expect(::Thread).to_not be_a_kind_of(described_class)
end
end

describe 'prepend' do
let(:thread_class) { ::Thread.dup }

before { allow(thread_class).to receive(:current).and_return(thread) }

it 'sets native thread IDs on current thread' do
expect(thread_class.current).to have_attributes(
clock_id: kind_of(Integer),
native_thread_id: kind_of(Integer),
cpu_time: kind_of(Float)
)
# Skip verification because the thread will not have been patched with the method yet
without_partial_double_verification do
expect(thread).to receive(:update_native_ids)
thread_class.send(:prepend, described_class)
end
end
end

Expand All @@ -64,12 +80,12 @@
context 'when forked' do
it 'returns a new native thread ID' do
# Get main thread native ID
original_native_thread_id = thread_class.current.native_thread_id
original_native_thread_id = thread.native_thread_id

expect_in_fork do
# Expect main thread native ID to not change
expect(thread_class.current.native_thread_id).to be_a_kind_of(Integer)
expect(thread_class.current.native_thread_id).to eq(original_native_thread_id)
expect(thread.native_thread_id).to be_a_kind_of(Integer)
expect(thread.native_thread_id).to eq(original_native_thread_id)
end
end
end
Expand All @@ -82,15 +98,19 @@
it { is_expected.to be_a_kind_of(Integer) }

context 'main thread' do
include_context 'with main thread'

context 'when forked' do
it 'returns a new clock ID' do
# Get main thread clock ID
original_clock_id = thread_class.current.clock_id

expect_in_fork do
# Expect main thread clock ID to change (to match fork's main thread)
expect(thread_class.current.clock_id).to be_a_kind_of(Integer)
expect(thread_class.current.clock_id).to_not eq(original_clock_id)
on_main_thread do
# Get main thread clock ID
original_clock_id = thread_class.current.clock_id

expect_in_fork do
# Expect main thread clock ID to change (to match fork's main thread)
expect(thread_class.current.clock_id).to be_a_kind_of(Integer)
expect(thread_class.current.clock_id).to_not eq(original_clock_id)
end
end
end
end
Expand Down Expand Up @@ -147,12 +167,16 @@
end

context 'main thread' do
include_context 'with main thread'

context 'when forked' do
it 'returns a CPU time' do
expect(thread_class.current.cpu_time).to be_a_kind_of(Float)

expect_in_fork do
on_main_thread do
expect(thread_class.current.cpu_time).to be_a_kind_of(Float)

expect_in_fork do
expect(thread_class.current.cpu_time).to be_a_kind_of(Float)
end
end
end
end
Expand Down
25 changes: 13 additions & 12 deletions spec/ddtrace/profiling/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@

module ProfilingFeatureHelpers
RSpec.shared_context 'with profiling extensions' do
around do |example|
unmodified_thread_class = ::Thread.dup
unmodified_process_class = ::Process.dup
unmodified_kernel_class = ::Kernel.dup
before do
stub_const('Thread', ::Thread.dup)
stub_const('Process', ::Process.dup)
stub_const('Kernel', ::Kernel.dup)

# Setup profiling to add
require 'ddtrace/profiling/tasks/setup'
Datadog::Profiling::Tasks::Setup.new.run
end

example.run
def profiling_on_main_thread
# Patch thread in a fork so we don't modify the original Thread class
expect_in_fork do
::Thread.send(:prepend, Datadog::Profiling::Ext::CThread)
yield
end

Object.send(:remove_const, :Thread)
Object.const_set('Thread', unmodified_thread_class)
Object.send(:remove_const, :Process)
Object.const_set('Process', unmodified_process_class)
Object.send(:remove_const, :Kernel)
Object.const_set('Kernel', unmodified_kernel_class)
# Ensure the patch didn't leak out of the fork.
expect(::Thread).to_not be_a_kind_of(Datadog::Profiling::Ext::CThread)
end
end
end
Expand Down

0 comments on commit 13a81e8

Please sign in to comment.