Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace payload chunking #840

Merged
merged 24 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2dc844e
Trace payload chunking
marcotc Oct 1, 2019
78942f3
Address review comments
marcotc Oct 17, 2019
62b5fd3
Add Transport layer
marcotc Oct 17, 2019
c5369cd
Better error message when trace is too large
marcotc Oct 22, 2019
e252a01
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 19, 2019
80480f7
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 20, 2019
b94aef8
Merge branch 'master' into feat/subdivide-payloads
marcotc Jan 21, 2020
1484b2f
Merge branch 'master' into feat/subdivide-payloads
marcotc Apr 24, 2020
c28c768
Merge branch 'feat/subdivide-payloads' of github.com:DataDog/dd-trace…
marcotc Apr 24, 2020
db963d8
Move trace_count to Traces::Parcel
marcotc Apr 24, 2020
a16940b
Extract Chunker
marcotc Apr 24, 2020
74c3924
Sacrifice for the linting gods ☠️
marcotc Apr 24, 2020
11e122f
Remove require for deleted file
marcotc Apr 24, 2020
af5e690
Use lazy stream for chunking and refactor Transport::IO to accommodat…
marcotc Apr 27, 2020
c8e107c
Fix typo
marcotc Apr 27, 2020
7ecf8e4
Add health metrics
marcotc Apr 27, 2020
abc49c3
Adapt benchmark tests
marcotc Apr 27, 2020
6eccad1
Batch chunk size health metric reporting
marcotc Apr 27, 2020
31935c9
Fix typo in encoder test name
marcotc Apr 27, 2020
15b2548
Fix bad JRuby method surfaced by test refactoring
marcotc Apr 27, 2020
d141ca2
Remove unnecessary lazy call for recursive send_traces
marcotc Apr 28, 2020
23c694e
Remove max_size tag from metrics
marcotc Apr 28, 2020
d57498e
Create separate object for EncodedParcel
marcotc Apr 28, 2020
d630fc2
Restore Traces::Request
marcotc Apr 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 89 additions & 13 deletions lib/ddtrace/encoding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,84 @@ def content_type
raise NotImplementedError
end

# Encodes a list of traces, expecting a list of items where each items
# is a list of spans. Before dump the string in a serialized format all
# traces are normalized. The traces nesting is not changed.
def encode_traces(traces)
to_send = []
# Trace agent limit payload size of 10 MiB (as of agent v6.14.1):
marcotc marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/DataDog/datadog-agent/blob/6.14.1/pkg/trace/api/api.go#L46
#
# This value is set lower than the hard limit, in case transport overhead pushes
# the payload size over the limit.
DEFAULT_MAX_PAYLOAD_SIZE = 9 * 1024 * 1024
marcotc marked this conversation as resolved.
Show resolved Hide resolved

# Encodes a list of traces in batches, expecting a list of items where each items
# is a list of spans.
# A serialized batch payload will not exceed +max_size+.
# Single traces larger than +max_size+ will be discarded.
# Before serializing, all traces are normalized. Trace nesting is not changed.
#
# @param traces [Array<Trace>] list of traces
# @param max_size [String] maximum acceptable payload size
# @yield [encoded_batch, batch_size] block invoked for every serialized batch of traces
# @yieldparam encoded_batch [String] serialized batch of traces, ready to be transmitted
# @yieldparam batch_size [Integer] number of traces serialized in this batch
# @return concatenated list of return values from the provided block
def encode_traces(traces, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
# Captures all return values from the provided block
returns = []

encoded_batch = []
batch_size = 0
traces.each do |trace|
to_send << trace.map(&:to_hash)
encoded_trace = encode_one(trace, max_size)

next unless encoded_trace

if encoded_trace.size + batch_size > max_size
# Can't fit trace in current batch
# TODO Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.chunked')

# Flush current batch
returns << yield(join(encoded_batch), encoded_batch.size)
# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.yield')

# Create new batch
encoded_batch = []
batch_size = 0
end

encoded_batch << encoded_trace
batch_size += encoded_trace.size
end

unless encoded_batch.empty?
returns << yield(join(encoded_batch), encoded_batch.size)
# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.yield')
end

returns
end

private

def encode_one(trace, max_size)
encoded = encode(trace.map(&:to_hash))

# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.trace.encode')
if encoded.size > max_size
# This single trace is too large, we can't flush it
# TODO should I add `trace` to the message? It will definitely be very large!
marcotc marked this conversation as resolved.
Show resolved Hide resolved
# TODO Datadog::Debug::HealthMetrics.increment('tracer.encoder.trace.max_size_exceed')
Datadog::Tracer.log.debug('Trace payload too large')
return nil
end
encode(to_send)

encoded
end

# Concatenates a list of traces previously encoded by +#encode+.
def join(encoded_traces)
raise NotImplementedError
end

# Defines the underlying format used during traces or services encoding.
# This method must be implemented and should only be used by the internal functions.
# Serializes a single trace into a String suitable for network transmission.
def encode(_)
raise NotImplementedError
end
Expand All @@ -40,8 +105,12 @@ def content_type
CONTENT_TYPE
end

def encode(obj)
JSON.dump(obj)
def encode(trace)
JSON.dump(trace)
end

def join(encoded_traces)
"[#{encoded_traces.join(',')}]"
end
end

Expand All @@ -57,8 +126,15 @@ def content_type
CONTENT_TYPE
end

def encode(obj)
MessagePack.pack(obj)
def encode(trace)
MessagePack.pack(trace)
end

def join(encoded_traces)
packer = MessagePack::Packer.new
packer.write_array_header(encoded_traces.size)
brettlangdon marked this conversation as resolved.
Show resolved Hide resolved

(packer.to_a + encoded_traces).join
end
end
end
Expand Down
12 changes: 6 additions & 6 deletions lib/ddtrace/transport/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ def send_request(request, &block)
# Build request into env
env = build_env(request)

# Get response from API
response = yield(current_api, env)
# Get responses from API
responses = yield(current_api, env)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to call the client multiple times for each request in the batch instead of changing the client to do batching itself? I think it's unwise to make the client handle multiple requests simultaneously as it will greatly increase the complexity of the transport code.

It will also make this batching feature more brittle and tightly coupled to how HTTP works instead of being agnostic to the means of transport, which will make it difficult (if not impossible) to adopt new means of transport in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to know what encoder we are using in order to break down the traces being flushed into multiple chunks.

The client is currently responsible for such information, in the form of client.current_api.spec.traces.encoder.
Also, when downgrading, the encoder might change. Downgrading is currently handled by the client.

I tried to prototype a different aproach just now, moving the chunking logic as far up the call chain as I believe it makes sense: feat/subdivide-payloads...tmp-feat/subdivide-payloads

I still don't like this one, too many layers are mixed together.

The main issue is that chunking, in a perfect scenario, would be done before we start calling the client. But the fact that we need the encoder, which is 2 levels down (inside the current Spec instance) and that the encoder can change if we need to downgrade the API, seem to make it quite tricky.

Next, I'm going to try to move current_api into the transport instance, and handle API versioning there, including the downgrading logic.

I'll report back on those results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that context is helpful, thanks for that explanation.

The existing design had certain assumptions about encoding before, hence why it was buried down lower in the transport, as it was considered a detail of the current API, which I think still holds true to a great extent.

I think it brings up some legitimate questions about how the design could change to accommodate batching though. Some possible paradigms I can think of might be:

  1. Expose the encoder, wrap the client with some kind of Batcher, then have the batcher encapsulate this logic entirely, and use the client to drive individual requests. Batching could be its own module Batching that can be composed into the existing HTTP::Client.
  2. Assert that encoding requests is detail of the API and that its acceptable for the API to split requests on the client's behalf. Consequently, you'd make the API spec responsible for batching and splitting one large request into smaller ones (which is what I think you were effectively doing.)

There might be more ways of handling this, but the key differences between these basically is option 1 is explicit in Client usage (one request, one response) and option 2 is auto-magic "don't worry about the details, we'll figure it out."

Personally I'm in favor of number 1, because it keeps the responsibilities of the API/Client as small as possible (less complexity), and doesn't get us into weird scenarios where we have to handle a request that was forked into multiple requests in code that isn't concerned with batching (e.g. Client#send_request). Instead, we can keep all this batching code (hopefully) in a neat little module that knows how to deal with multiple requests and extends the capability of the Client in a compartmentalized way. (We could even go a step further to extract the "retry" functionality into a similar module for consistency, something I might want to undertake anyways.)

Let me know your thoughts or if you have some alternative paradigms to suggest!


# Update statistics
update_stats_from_response!(response)
responses.each { |r| update_stats_from_response!(r) }

# If API should be downgraded, downgrade and try again.
if downgrade?(response)
if responses.find { |r| downgrade?(r) }
downgrade!
response = send_request(request, &block)
responses = send_request(request, &block)
end

response
responses
rescue StandardError => e
message = "Internal error during HTTP transport request. Cause: #{e.message} Location: #{e.backtrace.first}"

Expand Down
21 changes: 17 additions & 4 deletions lib/ddtrace/transport/http/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Response
def initialize(http_response, options = {})
super(http_response)
@service_rates = options.fetch(:service_rates, nil)
@trace_count = options.fetch(:trace_count, 0)
end
end

Expand Down Expand Up @@ -103,18 +104,30 @@ def service_rates?
end

def call(env, &block)
encoder.encode_traces(env.request.parcel.data) do |encoded_data, count|
# Ensure no data is leaked between each request.
# We have perform this copy before we start modifying headers and body.
new_env = env.dup

process_batch(new_env, encoded_data, count) { |e| super(e, &block) }
end
end

private

def process_batch(env, encoded_data, count)
# Add trace count header
env.headers[HEADER_TRACE_COUNT] = env.request.parcel.count.to_s
env.headers[HEADER_TRACE_COUNT] = count.to_s

# Encode body & type
env.headers[HEADER_CONTENT_TYPE] = encoder.content_type
env.body = env.request.parcel.encode_with(encoder)
env.body = encoded_data

# Query for response
http_response = super(env, &block)
http_response = yield env

# Process the response
response_options = {}.tap do |options|
response_options = { trace_count: count }.tap do |options|
# Parse service rates, if configured to do so.
if service_rates? && !http_response.payload.to_s.empty?
body = JSON.parse(http_response.payload)
Expand Down
6 changes: 1 addition & 5 deletions lib/ddtrace/transport/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ class Parcel
def count
data.length
end

def encode_with(encoder)
encoder.encode_traces(data)
end
end

# Traces request
Expand All @@ -26,7 +22,7 @@ def initialize(traces)

# Traces response
module Response
attr_reader :service_rates
attr_reader :service_rates, :trace_count
end
end
end
Expand Down
25 changes: 15 additions & 10 deletions lib/ddtrace/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,19 @@ def send_spans(traces, transport)
# Inject hostname if configured to do so
inject_hostname!(traces) if Datadog.configuration.report_hostname

# Send traces an get a response.
response = transport.send_traces(traces)
# Send traces and get responses
responses = transport.send_traces(traces)

unless response.internal_error?
@traces_flushed += traces.length unless response.server_error?

# Update priority sampler
unless priority_sampler.nil? || response.service_rates.nil?
priority_sampler.update(response.service_rates)
end
# Tally up successful flushes
responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
@traces_flushed += response.trace_count
end

# Update priority sampler
update_priority_sampler(responses.last)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are no successful responses?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see update_priority_sampling_sampler might be handling this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, this just a matter of extracting a somewhat complex logic block into its own method.


# Return if server error occurred.
!response.server_error?
!responses.find(&:server_error?)
end

def send_runtime_metrics
Expand Down Expand Up @@ -153,5 +152,11 @@ def inject_hostname!(traces)
end
end
end

def update_priority_sampler(response)
return unless response && !response.internal_error? && priority_sampler && response.service_rates

priority_sampler.update(response.service_rates)
end
end
end
87 changes: 87 additions & 0 deletions spec/ddtrace/encoding_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
require 'spec_helper'
require 'spec/support/language_helpers'

require 'ddtrace/encoding'

RSpec.describe Datadog::Encoding do
subject(:encode) { encoder.method(:encode_traces) }

let(:block) { proc { block_response } }
let(:block_response) { double('response') }

context 'Base encoder' do
let(:encoder) { Class.new { include Datadog::Encoding::Encoder }.new }

let(:traces) { get_test_traces(3) }

before do
allow(encoder).to receive(:encode).with(traces[0].map(&:to_hash)).and_return('blob1')
allow(encoder).to receive(:encode).with(traces[1].map(&:to_hash)).and_return('blob2')
allow(encoder).to receive(:encode).with(traces[2].map(&:to_hash)).and_return('blob3')
allow(encoder).to receive(:join) { |arr| arr.join(',') }
end

it do
expect { |b| encode.call(traces, &b) }.to yield_with_args('blob1,blob2,blob3', 3)
end

it 'returns yielded block returns' do
expect(encode.call(traces, &block)).to eq([block_response])
end

context 'with large batch of traces' do
let(:max_size) { 10 }

it do
expect { |b| encode.call(traces, max_size: max_size, &b) }
.to yield_successive_args(['blob1,blob2', 2], ['blob3', 1])
end

it 'returns yielded block returns' do
expect(encode.call(traces, max_size: max_size, &block)).to eq([block_response, block_response])
end
end

context 'with individual traces too large' do
let(:max_size) { 4 }

it do
expect { |b| encode.call(traces, max_size: max_size, &b) }.not_to yield_control
end
end
end

context 'Msgpack encoding' do
let(:encoder) { Datadog::Encoding::MsgpackEncoder }
let(:traces) { get_test_traces(2) }

it do
expect(encode.call(traces) do |encoded, size|
expect(size).to eq(2)

items = MessagePack.unpack(encoded)
expect(items.size).to eq(2)
expect(items.first).to eq(traces.first.map(&:to_hash).map(&:stringify_keys))

block_response
end).to eq([block_response])
end
end

context 'JSON encoding' do
let(:encoder) { Datadog::Encoding::JSONEncoder }
let(:traces) { get_test_traces(2) }

it do
expect(encode.call(traces) do |encoded, size|
expect(size).to eq(2)

items = JSON.parse(encoded)
expect(items.size).to eq(2)
expect(items.first).to eq(traces.first.map(&:to_hash).map(&:stringify_keys))

block_response
end).to eq([block_response])
end
end
end
Loading