Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace payload chunking #840

Merged
merged 24 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2dc844e
Trace payload chunking
marcotc Oct 1, 2019
78942f3
Address review comments
marcotc Oct 17, 2019
62b5fd3
Add Transport layer
marcotc Oct 17, 2019
c5369cd
Better error message when trace is too large
marcotc Oct 22, 2019
e252a01
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 19, 2019
80480f7
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 20, 2019
b94aef8
Merge branch 'master' into feat/subdivide-payloads
marcotc Jan 21, 2020
1484b2f
Merge branch 'master' into feat/subdivide-payloads
marcotc Apr 24, 2020
c28c768
Merge branch 'feat/subdivide-payloads' of github.com:DataDog/dd-trace…
marcotc Apr 24, 2020
db963d8
Move trace_count to Traces::Parcel
marcotc Apr 24, 2020
a16940b
Extract Chunker
marcotc Apr 24, 2020
74c3924
Sacrifice for the linting gods ☠️
marcotc Apr 24, 2020
11e122f
Remove require for deleted file
marcotc Apr 24, 2020
af5e690
Use lazy stream for chunking and refactor Transport::IO to accommodat…
marcotc Apr 27, 2020
c8e107c
Fix typo
marcotc Apr 27, 2020
7ecf8e4
Add health metrics
marcotc Apr 27, 2020
abc49c3
Adapt benchmark tests
marcotc Apr 27, 2020
6eccad1
Batch chunk size health metric reporting
marcotc Apr 27, 2020
31935c9
Fix typo in encoder test name
marcotc Apr 27, 2020
15b2548
Fix bad JRuby method surfaced by test refactoring
marcotc Apr 27, 2020
d141ca2
Remove unnecessary lazy call for recursive send_traces
marcotc Apr 28, 2020
23c694e
Remove max_size tag from metrics
marcotc Apr 28, 2020
d57498e
Create separate object for EncodedParcel
marcotc Apr 28, 2020
d630fc2
Restore Traces::Request
marcotc Apr 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 86 additions & 13 deletions lib/ddtrace/encoding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,81 @@ def content_type
raise NotImplementedError
end

# Encodes a list of traces, expecting a list of items where each items
# is a list of spans. Before dump the string in a serialized format all
# traces are normalized. The traces nesting is not changed.
def encode_traces(traces)
to_send = []
# Trace agent limit payload size of 10 MiB (since agent v5.11.0):
# https://github.com/DataDog/datadog-agent/blob/6.14.1/pkg/trace/api/api.go#L46
#
# We set the value to a conservative 5 MiB, in case network speed is slow.
DEFAULT_MAX_PAYLOAD_SIZE = 5 * 1024 * 1024

# Encodes a list of traces in batches, expecting a list of items where each items
# is a list of spans.
# A serialized batch payload will not exceed +max_size+.
# Single traces larger than +max_size+ will be discarded.
# Before serializing, all traces are normalized. Trace nesting is not changed.
#
# @param traces [Array<Trace>] list of traces
# @param max_size [String] maximum acceptable payload size
# @yield [encoded_batch, batch_size] block invoked for every serialized batch of traces
# @yieldparam encoded_batch [String] serialized batch of traces, ready to be transmitted
# @yieldparam batch_size [Integer] number of traces serialized in this batch
# @return concatenated list of return values from the provided block
def encode_traces(traces, max_size: DEFAULT_MAX_PAYLOAD_SIZE)
# Captures all return values from the provided block
returns = []

encoded_batch = []
batch_size = 0
traces.each do |trace|
to_send << trace.map(&:to_hash)
encoded_trace = encode_one(trace, max_size)

next unless encoded_trace

if encoded_trace.size + batch_size > max_size
# Can't fit trace in current batch
# TODO Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.chunked')

# Flush current batch
returns << yield(join(encoded_batch), encoded_batch.size)
# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.yield')

# Create new batch
encoded_batch = []
batch_size = 0
end

encoded_batch << encoded_trace
batch_size += encoded_trace.size
end

unless encoded_batch.empty?
returns << yield(join(encoded_batch), encoded_batch.size)
# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.batch.yield')
end

returns
end

private

def encode_one(trace, max_size)
encoded = encode(trace.map(&:to_hash))

# TODO: Datadog::Debug::HealthMetrics.increment('tracer.encoder.trace.encode')
if encoded.size > max_size
# This single trace is too large, we can't flush it
Datadog::Tracer.log.debug { "Trace payload too large: #{trace.map(&:to_hash)}" }
return nil
end
encode(to_send)

encoded
end

# Concatenates a list of traces previously encoded by +#encode+.
def join(encoded_traces)
raise NotImplementedError
end

# Defines the underlying format used during traces or services encoding.
# This method must be implemented and should only be used by the internal functions.
# Serializes a single trace into a String suitable for network transmission.
def encode(_)
raise NotImplementedError
end
Expand All @@ -40,8 +102,12 @@ def content_type
CONTENT_TYPE
end

def encode(obj)
JSON.dump(obj)
def encode(trace)
JSON.dump(trace)
end

def join(encoded_traces)
"[#{encoded_traces.join(',')}]"
end
end

Expand All @@ -57,8 +123,15 @@ def content_type
CONTENT_TYPE
end

def encode(obj)
MessagePack.pack(obj)
def encode(trace)
MessagePack.pack(trace)
end

def join(encoded_traces)
packer = MessagePack::Packer.new
packer.write_array_header(encoded_traces.size)
brettlangdon marked this conversation as resolved.
Show resolved Hide resolved

(packer.to_a + encoded_traces).join
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module HTTP

# Builds a new Transport::HTTP::Client
def new(&block)
Builder.new(&block).to_client
Builder.new(&block).to_transport
end

# Builds a new Transport::HTTP::Client with default settings
Expand Down
4 changes: 4 additions & 0 deletions lib/ddtrace/transport/http/api/instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def initialize(spec, adapter, options = {})
@headers = options.fetch(:headers, {})
end

def encoder
spec.encoder
end

def call(env)
# Add headers to request env, unless empty.
env.headers.merge!(headers) unless headers.empty?
Expand Down
8 changes: 3 additions & 5 deletions lib/ddtrace/transport/http/builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require 'ddtrace/transport/http/api/map'
require 'ddtrace/transport/http/api/instance'
require 'ddtrace/transport/http/client'
require 'ddtrace/transport/http/transport'

module Datadog
module Transport
Expand Down Expand Up @@ -70,13 +71,10 @@ def default_api=(key)
@default_api = key
end

def to_client
def to_transport
raise NoDefaultApiError if @default_api.nil?

@client ||= Client.new(
to_api_instances,
@default_api
)
HTTP::Transport.new(to_api_instances, @default_api)
end

def to_api_instances
Expand Down
67 changes: 5 additions & 62 deletions lib/ddtrace/transport/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,22 @@ module HTTP
class Client
include Transport::Statistics

attr_reader \
:apis,
:current_api_id
attr_reader :api

def initialize(apis, current_api_id)
@apis = apis

# Activate initial API
change_api!(current_api_id)
def initialize(api)
@api = api
end

def send_request(request, &block)
# Build request into env
env = build_env(request)

# Get response from API
response = yield(current_api, env)
# Get responses from API
response = yield(api, env)

# Update statistics
update_stats_from_response!(response)

# If API should be downgraded, downgrade and try again.
if downgrade?(response)
downgrade!
response = send_request(request, &block)
end

response
rescue StandardError => e
message = "Internal error during HTTP transport request. Cause: #{e.message} Location: #{e.backtrace.first}"
Expand All @@ -56,52 +45,6 @@ def send_request(request, &block)
def build_env(request)
Env.new(request)
end

def downgrade?(response)
return false unless apis.fallbacks.key?(current_api_id)
response.not_found? || response.unsupported?
end

def current_api
apis[current_api_id]
end

def change_api!(api_id)
raise UnknownApiVersionError, api_id unless apis.key?(api_id)
@current_api_id = api_id
end

def downgrade!
downgrade_api_id = apis.fallbacks[current_api_id]
raise NoDowngradeAvailableError, current_api_id if downgrade_api_id.nil?
change_api!(downgrade_api_id)
end

# Raised when configured with an unknown API version
class UnknownApiVersionError < StandardError
attr_reader :version

def initialize(version)
@version = version
end

def message
"No matching transport API for version #{version}!"
end
end

# Raised when configured with an unknown API version
class NoDowngradeAvailableError < StandardError
attr_reader :version

def initialize(version)
@version = version
end

def message
"No downgrade from transport API version #{version} is available!"
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/http/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(http_response)
@http_response = http_response
end

def_delegators :@http_response, *Transport::Response.instance_methods
def_delegators :@http_response, *Datadog::Transport::Response.instance_methods
end
end
end
Expand Down
17 changes: 10 additions & 7 deletions lib/ddtrace/transport/http/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ module Traces
# Response from HTTP transport for traces
class Response
include HTTP::Response
include Transport::Traces::Response
include Datadog::Transport::Traces::Response

def initialize(http_response, options = {})
super(http_response)
@service_rates = options.fetch(:service_rates, nil)
@trace_count = options.fetch(:trace_count, 0)
end
end

# Extensions for HTTP client
module Client
def send_traces(traces)
request = Transport::Traces::Request.new(traces)

def send_payload(request)
send_request(request) do |api, env|
api.send_traces(env)
end
Expand All @@ -45,6 +44,10 @@ def send_traces(env, &block)
traces.call(env, &block)
end

def encoder
traces.encoder
end

# Raised when traces sent but no traces endpoint is defined
class NoTraceEndpointDefinedError < StandardError
attr_reader :spec
Expand Down Expand Up @@ -104,17 +107,17 @@ def service_rates?

def call(env, &block)
# Add trace count header
env.headers[HEADER_TRACE_COUNT] = env.request.parcel.count.to_s
env.headers[HEADER_TRACE_COUNT] = env.request.trace_count.to_s

# Encode body & type
env.headers[HEADER_CONTENT_TYPE] = encoder.content_type
env.body = env.request.parcel.encode_with(encoder)
env.body = env.request.parcel.data

# Query for response
http_response = super(env, &block)

# Process the response
response_options = {}.tap do |options|
response_options = { trace_count: env.request.trace_count }.tap do |options|
# Parse service rates, if configured to do so.
if service_rates? && !http_response.payload.to_s.empty?
body = JSON.parse(http_response.payload)
Expand Down
Loading