Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace payload chunking #840

Merged
merged 24 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2dc844e
Trace payload chunking
marcotc Oct 1, 2019
78942f3
Address review comments
marcotc Oct 17, 2019
62b5fd3
Add Transport layer
marcotc Oct 17, 2019
c5369cd
Better error message when trace is too large
marcotc Oct 22, 2019
e252a01
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 19, 2019
80480f7
Merge branch 'master' into feat/subdivide-payloads
marcotc Dec 20, 2019
b94aef8
Merge branch 'master' into feat/subdivide-payloads
marcotc Jan 21, 2020
1484b2f
Merge branch 'master' into feat/subdivide-payloads
marcotc Apr 24, 2020
c28c768
Merge branch 'feat/subdivide-payloads' of github.com:DataDog/dd-trace…
marcotc Apr 24, 2020
db963d8
Move trace_count to Traces::Parcel
marcotc Apr 24, 2020
a16940b
Extract Chunker
marcotc Apr 24, 2020
74c3924
Sacrifice for the linting gods ☠️
marcotc Apr 24, 2020
11e122f
Remove require for deleted file
marcotc Apr 24, 2020
af5e690
Use lazy stream for chunking and refactor Transport::IO to accommodat…
marcotc Apr 27, 2020
c8e107c
Fix typo
marcotc Apr 27, 2020
7ecf8e4
Add health metrics
marcotc Apr 27, 2020
abc49c3
Adapt benchmark tests
marcotc Apr 27, 2020
6eccad1
Batch chunk size health metric reporting
marcotc Apr 27, 2020
31935c9
Fix typo in encoder test name
marcotc Apr 27, 2020
15b2548
Fix bad JRuby method surfaced by test refactoring
marcotc Apr 27, 2020
d141ca2
Remove unnecessary lazy call for recursive send_traces
marcotc Apr 28, 2020
23c694e
Remove max_size tag from metrics
marcotc Apr 28, 2020
d57498e
Create separate object for EncodedParcel
marcotc Apr 28, 2020
d630fc2
Restore Traces::Request
marcotc Apr 29, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions lib/ddtrace/chunker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
require 'json'
require 'msgpack'

module Datadog
# Chunks list of elements into batches
module Chunker
module_function

# Chunks a list into batches of at most +max_chunk_size+ elements each.
#
# An exception can occur if a single element is too large. That single
# element will be returned in its own chunk. You have to verify by yourself
# when such elements are returned.
#
# @param list [Enumerable] list of elements
# @param max_chunk_size [Numeric] maximum acceptable chunk size
# @return [Enumerable] lazy list of chunks
def chunk_by_size(list, max_chunk_size)
chunk_agg = 0
list.slice_before do |elem|
size = elem.size
delner marked this conversation as resolved.
Show resolved Hide resolved
chunk_agg += size
if chunk_agg > max_chunk_size
# Can't fit element in current chunk, start a new one.
chunk_agg = size
true
else
# Add to current chunk
false
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/ddtrace/diagnostics/health.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Metrics < ::Datadog::Metrics
count :queue_accepted_lengths, Ext::Diagnostics::Health::Metrics::METRIC_QUEUE_ACCEPTED_LENGTHS
count :queue_dropped, Ext::Diagnostics::Health::Metrics::METRIC_QUEUE_DROPPED
count :traces_filtered, Ext::Diagnostics::Health::Metrics::METRIC_TRACES_FILTERED
count :transport_trace_too_large, Ext::Diagnostics::Health::Metrics::METRIC_TRANSPORT_TRACE_TOO_LARGE
count :transport_chunked, Ext::Diagnostics::Health::Metrics::METRIC_TRANSPORT_CHUNKED
count :writer_cpu_time, Ext::Diagnostics::Health::Metrics::METRIC_WRITER_CPU_TIME

gauge :queue_length, Ext::Diagnostics::Health::Metrics::METRIC_QUEUE_LENGTH
Expand Down
52 changes: 13 additions & 39 deletions lib/ddtrace/encoding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,12 @@ def content_type
raise NotImplementedError
end

# Encodes a list of traces, expecting a list of items where each items
# is a list of spans. Before dump the string in a serialized format all
# traces are normalized. The traces nesting is not changed.
def encode_traces(traces)
to_send = []
traces.each do |trace|
to_send << trace.map(&:to_hash)
end
encode(to_send)
# Concatenates a list of elements previously encoded by +#encode+.
def join(encoded_elements)
raise NotImplementedError
end

# Defines the underlying format used during traces or services encoding.
# This method must be implemented and should only be used by the internal functions.
# Serializes a single trace into a String suitable for network transmission.
def encode(_)
raise NotImplementedError
end
Expand All @@ -44,34 +37,8 @@ def encode(obj)
JSON.dump(obj)
end

# New version of JSON Encoder that is API compliant.
module V2
extend JSONEncoder

ENCODED_IDS = [
:trace_id,
:span_id,
:parent_id
].freeze

module_function

def encode_traces(traces)
trace_hashes = traces.collect do |trace|
# Convert each trace to hash
trace.map(&:to_hash).tap do |spans|
# Convert IDs to hexadecimal
spans.each do |span|
ENCODED_IDS.each do |id|
span[id] = span[id].to_s(16) if span.key?(id)
end
end
end
end

# Wrap traces & encode them
encode(traces: trace_hashes)
end
def join(encoded_data)
"[#{encoded_data.join(',')}]"
end
end

Expand All @@ -90,6 +57,13 @@ def content_type
def encode(obj)
MessagePack.pack(obj)
end

def join(encoded_data)
packer = MessagePack::Packer.new
packer.write_array_header(encoded_data.size)

(packer.buffer.to_a + encoded_data).join
end
end
end
end
2 changes: 2 additions & 0 deletions lib/ddtrace/ext/diagnostics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ module Metrics
METRIC_QUEUE_SPANS = 'datadog.tracer.queue.spans'.freeze
METRIC_SAMPLING_SERVICE_CACHE_LENGTH = 'datadog.tracer.sampling.service_cache_length'.freeze
METRIC_TRACES_FILTERED = 'datadog.tracer.traces.filtered'.freeze
METRIC_TRANSPORT_CHUNKED = 'datadog.tracer.transport.chunked'.freeze
METRIC_TRANSPORT_TRACE_TOO_LARGE = 'datadog.tracer.transport.trace_too_large'.freeze
METRIC_WRITER_CPU_TIME = 'datadog.tracer.writer.cpu_time'.freeze
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module HTTP

# Builds a new Transport::HTTP::Client
def new(&block)
Builder.new(&block).to_client
Builder.new(&block).to_transport
end

# Builds a new Transport::HTTP::Client with default settings
Expand Down
4 changes: 4 additions & 0 deletions lib/ddtrace/transport/http/api/instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def initialize(spec, adapter, options = {})
@headers = options.fetch(:headers, {})
end

def encoder
spec.encoder
end

def call(env)
# Add headers to request env, unless empty.
env.headers.merge!(headers) unless headers.empty?
Expand Down
8 changes: 3 additions & 5 deletions lib/ddtrace/transport/http/builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,11 @@ def default_api=(key)
@default_api = key
end

def to_client
def to_transport
raise NoDefaultApiError if @default_api.nil?

@client ||= Client.new(
to_api_instances,
@default_api
)
# DEV: Should not be specific to traces
Transport::Traces::Transport.new(to_api_instances, @default_api)
end

def to_api_instances
Expand Down
67 changes: 5 additions & 62 deletions lib/ddtrace/transport/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,22 @@ module HTTP
class Client
include Transport::HTTP::Statistics

attr_reader \
:apis,
:current_api_id
attr_reader :api

def initialize(apis, current_api_id)
@apis = apis

# Activate initial API
change_api!(current_api_id)
def initialize(api)
@api = api
end

def send_request(request, &block)
# Build request into env
env = build_env(request)

# Get response from API
response = yield(current_api, env)
# Get responses from API
response = yield(api, env)

# Update statistics
update_stats_from_response!(response)

# If API should be downgraded, downgrade and try again.
if downgrade?(response)
downgrade!
response = send_request(request, &block)
end

response
rescue StandardError => e
message = "Internal error during HTTP transport request. Cause: #{e.message} Location: #{e.backtrace.first}"
Expand All @@ -55,52 +44,6 @@ def send_request(request, &block)
def build_env(request)
Env.new(request)
end

def downgrade?(response)
return false unless apis.fallbacks.key?(current_api_id)
response.not_found? || response.unsupported?
end

def current_api
apis[current_api_id]
end

def change_api!(api_id)
raise UnknownApiVersionError, api_id unless apis.key?(api_id)
@current_api_id = api_id
end

def downgrade!
downgrade_api_id = apis.fallbacks[current_api_id]
raise NoDowngradeAvailableError, current_api_id if downgrade_api_id.nil?
change_api!(downgrade_api_id)
end

# Raised when configured with an unknown API version
class UnknownApiVersionError < StandardError
attr_reader :version

def initialize(version)
@version = version
end

def message
"No matching transport API for version #{version}!"
end
end

# Raised when configured with an unknown API version
class NoDowngradeAvailableError < StandardError
attr_reader :version

def initialize(version)
@version = version
end

def message
"No downgrade from transport API version #{version} is available!"
end
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/http/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def initialize(http_response)
@http_response = http_response
end

def_delegators :@http_response, *Transport::Response.instance_methods
def_delegators :@http_response, *Datadog::Transport::Response.instance_methods

def code
@http_response.respond_to?(:code) ? @http_response.code : nil
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/http/statistics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module HTTP
# Tracks statistics for HTTP transports
module Statistics
def self.included(base)
base.send(:include, Transport::Statistics)
base.send(:include, Datadog::Transport::Statistics)
base.send(:include, InstanceMethods)
end

Expand Down
17 changes: 10 additions & 7 deletions lib/ddtrace/transport/http/traces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ module Traces
# Response from HTTP transport for traces
class Response
include HTTP::Response
include Transport::Traces::Response
include Datadog::Transport::Traces::Response

def initialize(http_response, options = {})
super(http_response)
@service_rates = options.fetch(:service_rates, nil)
@trace_count = options.fetch(:trace_count, 0)
end
end

# Extensions for HTTP client
module Client
def send_traces(traces)
request = Transport::Traces::Request.new(traces)

def send_payload(request)
send_request(request) do |api, env|
api.send_traces(env)
end
Expand All @@ -45,6 +44,10 @@ def send_traces(env, &block)
traces.call(env, &block)
end

def encoder
traces.encoder
end

# Raised when traces sent but no traces endpoint is defined
class NoTraceEndpointDefinedError < StandardError
attr_reader :spec
Expand Down Expand Up @@ -104,17 +107,17 @@ def service_rates?

def call(env, &block)
# Add trace count header
env.headers[HEADER_TRACE_COUNT] = env.request.parcel.count.to_s
env.headers[HEADER_TRACE_COUNT] = env.request.parcel.trace_count.to_s

# Encode body & type
env.headers[HEADER_CONTENT_TYPE] = encoder.content_type
env.body = env.request.parcel.encode_with(encoder)
env.body = env.request.parcel.data

# Query for response
http_response = super(env, &block)

# Process the response
response_options = {}.tap do |options|
response_options = { trace_count: env.request.parcel.trace_count }.tap do |options|
# Parse service rates, if configured to do so.
if service_rates? && !http_response.payload.to_s.empty?
body = JSON.parse(http_response.payload)
Expand Down
2 changes: 1 addition & 1 deletion lib/ddtrace/transport/io.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def new(out, encoder)
def default(options = {})
new(
options.fetch(:out, STDOUT),
options.fetch(:encoder, Encoding::JSONEncoder::V2)
options.fetch(:encoder, Encoding::JSONEncoder)
)
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/ddtrace/transport/io/response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ module IO
# Response from HTTP transport for traces
class Response
include Transport::Response
include Transport::Traces::Response

attr_reader \
:result

def initialize(result)
def initialize(result, trace_count = 1)
@result = result
@trace_count = trace_count
end

def ok?
Expand Down
Loading