Skip to content

Commit

Permalink
[PROF-3814] Limit number of threads per sample
Browse files Browse the repository at this point in the history
Prior to this change, the profiler tried to sample all threads each
time it decided to sample.

This could have a negative impact on the latency of Ruby services with
a lot of threads, because each sampling pass would be quite expensive.

Just as an illustration, consider the
`benchmarks/profiler_sample_loop` benchmark which tries to sample as
fast as possible 4 threads with very deep (500 frames+) stacks.

As the number of threads to sample increases, so does the performance
of sampling degrade:

| Sampled threads | Iterations per second                               |
|-----------------|-----------------------------------------------------|
| 4 threads       | 390.921  (± 6.1%) i/s -      3.920k in  10.075624s  |
| 16 threads      |  97.037  (± 6.2%) i/s -    972.000  in  10.072298s  |
| 32 threads      |  49.576  (± 4.0%) i/s -    496.000  in  10.038491s  |
| 64 threads      |  24.060  (± 8.3%) i/s -    240.000  in  10.033481s  |

(All numbers from my laptop: i7-1068NG7 + ruby 2.7.4p191 [x86_64-darwin19])

When combined with our dynamic sampling mechanism, if the profiler
starts to take longer to sample, it just samples less often.

BUT this means that it can impact service latency because suddenly we
may be spending 40ms+ of CPU time (64 thread example + 500 frames, very
extreme) which from experience (#1511, #1522) is not good.

Instead, this PR changes the stack sampler to sample up to a set
maximum of threads (16), randomly selected among all threads.

This means that adding more threads to the system means very little
degradation:

| Sampled threads | Iterations per second                              |
|-----------------|----------------------------------------------------|
| 16 threads      | 104.859  (± 6.7%) i/s -      1.050k in  10.068781s |
| 32 threads      | 100.957  (± 5.9%) i/s -      1.010k in  10.047250s |
| 64 threads      |  98.098  (± 5.1%) i/s -    981.000  in  10.038244s |
| 256 threads     |  84.740  (± 8.3%) i/s -    848.000  in  10.074037s |

There's still a bit of degradation that I suspect is from pure VM
overhead -- 256+ threads on a single Ruby VM adds up.

Because we pick the threads to sample randomly, we'll eventually sample
all threads -- just not at once.

Finally, regarding the dynamic sampling mechanism, because the profiler
will not take longer to sample, it will sample more often, and thus
over a longer period we should take sample roughly the same samples.
  • Loading branch information
ivoanjo committed Sep 23, 2021
1 parent bba0408 commit 7d80be8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
24 changes: 23 additions & 1 deletion lib/ddtrace/profiling/collectors/stack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ class Stack < Worker # rubocop:disable Metrics/ClassLength
DEFAULT_MAX_TIME_USAGE_PCT = 2.0
MIN_INTERVAL = 0.01
THREAD_LAST_CPU_TIME_KEY = :datadog_profiler_last_cpu_time
DEFAULT_MAX_THREADS_SAMPLED = 16

attr_reader \
:recorder,
:max_frames,
:trace_identifiers_helper,
:ignore_thread,
:max_time_usage_pct,
:max_threads_sampled,
:thread_api

def initialize(
Expand All @@ -33,6 +35,7 @@ def initialize(
trace_identifiers_helper:, # Usually an instance of Datadog::Profiling::TraceIdentifiers::Helper
ignore_thread: nil,
max_time_usage_pct: DEFAULT_MAX_TIME_USAGE_PCT,
max_threads_sampled: DEFAULT_MAX_THREADS_SAMPLED,
thread_api: Thread,
fork_policy: Workers::Async::Thread::FORK_POLICY_RESTART, # Restart in forks by default
interval: MIN_INTERVAL,
Expand All @@ -43,6 +46,7 @@ def initialize(
@trace_identifiers_helper = trace_identifiers_helper
@ignore_thread = ignore_thread
@max_time_usage_pct = max_time_usage_pct
@max_threads_sampled = max_threads_sampled
@thread_api = thread_api

# Workers::Async::Thread settings
Expand Down Expand Up @@ -100,7 +104,7 @@ def collect_events
@last_wall_time = current_wall_time

# Collect backtraces from each thread
thread_api.list.each do |thread|
threads_to_sample.each do |thread|
next unless thread.alive?
next if ignore_thread.is_a?(Proc) && ignore_thread.call(thread)

Expand Down Expand Up @@ -251,6 +255,24 @@ def reset_cpu_time_tracking
thread.thread_variable_set(THREAD_LAST_CPU_TIME_KEY, nil)
end
end

# Whenever there are more than max_threads_sampled active, we only sample a subset of them.
# We do this to avoid impacting the latency of the service being profiled. We want to avoid doing
# a big burst of work all at once (sample everything), and instead do a little work each time
# (sample a bit by bit).
# Because we pick the threads to sample randomly, we'll eventually sample all threads -- just not at once.
# Notice also that this will interact with our dynamic sampling mechanism -- if samples are faster, we take
# them more often, if they are slower, we take them less often -- which again means that over a longer period
# we should take sample roughly the same samples.
def threads_to_sample
all_threads = thread_api.list

if all_threads.size > max_threads_sampled
all_threads.sample(max_threads_sampled)
else
all_threads
end
end
end
end
end
Expand Down
52 changes: 51 additions & 1 deletion spec/ddtrace/profiling/collectors/stack_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
require 'ddtrace/profiling/collectors/stack'
require 'ddtrace/profiling/trace_identifiers/helper'
require 'ddtrace/profiling/recorder'
require 'set'
require 'timeout'

RSpec.describe Datadog::Profiling::Collectors::Stack do
subject(:collector) { described_class.new(recorder, **options) }
Expand Down Expand Up @@ -164,9 +166,10 @@
end

describe '#collect_events' do
let(:options) { { **super(), thread_api: thread_api } }
let(:options) { { **super(), thread_api: thread_api, max_threads_sampled: max_threads_sampled } }
let(:thread_api) { class_double(Thread, current: Thread.current) }
let(:threads) { [Thread.current] }
let(:max_threads_sampled) { 3 }

subject(:collect_events) { collector.collect_events }

Expand All @@ -180,6 +183,53 @@
is_expected.to include(kind_of(Datadog::Profiling::Events::StackSample))
end

describe 'max_threads_sampled behavior' do
context 'when number of threads to be sample is <= max_threads_sampled' do
let(:threads) { Array.new(max_threads_sampled) { |n| instance_double(Thread, "Thread #{n}", alive?: true) } }

it 'samples all threads' do
sampled_threads = []
expect(collector).to receive(:collect_thread_event).exactly(max_threads_sampled).times do |thread, *_|
sampled_threads << thread
end

result = collect_events

expect(result.size).to be max_threads_sampled
expect(sampled_threads).to eq threads
end
end

context 'when number of threads to be sample is > max_threads_sampled' do
let(:threads) { Array.new(max_threads_sampled + 1) { |n| instance_double(Thread, "Thread #{n}", alive?: true) } }

it 'samples exactly max_threads_sampled threads' do
sampled_threads = []
expect(collector).to receive(:collect_thread_event).exactly(max_threads_sampled).times do |thread, *_|
sampled_threads << thread
end

result = collect_events

expect(result.size).to be max_threads_sampled
expect(threads).to include(*sampled_threads)
end

it 'eventually samples all threads' do
sampled_threads = Set.new
allow(collector).to receive(:collect_thread_event) { |thread, *_| sampled_threads << thread }

begin
Timeout.timeout(1) { collector.collect_events while sampled_threads.size != threads.size }
rescue => Timeout::Error
raise 'Failed to eventually sample all threads in time given'
end

expect(threads).to contain_exactly(*sampled_threads.to_a)
end
end
end

context 'when the thread' do
let(:thread) { instance_double(Thread, alive?: alive?) }
let(:threads) { [thread] }
Expand Down

0 comments on commit 7d80be8

Please sign in to comment.