Skip to content

Commit

Permalink
tracing: Add support for sending data in Zipkin v2 format (#6985)
Browse files Browse the repository at this point in the history
Description: This patch supports sending a list of spans as JSON v2 and protobuf message over HTTP to Zipkin collector. [Sending protobuf](https://github.com/openzipkin/zipkin-api/blob/0.2.1/zipkin.proto) is considered to be more efficient than JSON, even compared to the v2's JSON (openzipkin/zipkin#2589 (comment)). This is an effort to rework #6798.

The approach is by serializing the v1 model to both v2 JSON and protobuf.

Risk Level: Low, since the default is still HTTP-JSON v1 based on https://github.com/openzipkin/zipkin-api/blob/0.2.2/zipkin-api.yaml.
Testing: Unit testing, manual integration test with real Zipkin collector server.
Docs Changes: Added
Release Notes: Added
Fixes: #4839

Signed-off-by: Dhi Aurrahman <dio@tetrate.io>
Signed-off-by: José Carlos Chávez <jcchavezs@gmail.com>
  • Loading branch information
dio authored Aug 30, 2019
1 parent 29f199c commit 6c6e18e
Show file tree
Hide file tree
Showing 24 changed files with 925 additions and 233 deletions.
25 changes: 25 additions & 0 deletions api/bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def api_dependencies():
locations = REPOSITORY_LOCATIONS,
build_file_content = KAFKASOURCE_BUILD_CONTENT,
)
envoy_http_archive(
name = "com_github_openzipkin_zipkinapi",
locations = REPOSITORY_LOCATIONS,
build_file_content = ZIPKINAPI_BUILD_CONTENT,
)

GOGOPROTO_BUILD_CONTENT = """
load("@com_google_protobuf//:protobuf.bzl", "cc_proto_library", "py_proto_library")
Expand Down Expand Up @@ -153,3 +158,23 @@ filegroup(
)
"""

ZIPKINAPI_BUILD_CONTENT = """
load("@envoy_api//bazel:api_build_system.bzl", "api_proto_library", "api_go_proto_library")
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
api_proto_library(
name = "zipkin",
srcs = [
"zipkin-jsonv2.proto",
"zipkin.proto",
],
visibility = ["//visibility:public"],
)
api_go_proto_library(
name = "zipkin",
proto = ":zipkin",
)
"""
8 changes: 8 additions & 0 deletions api/bazel/repository_locations.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ KAFKA_SOURCE_SHA = "ae7a1696c0a0302b43c5b21e515c37e6ecd365941f68a510a7e442eebddf
UDPA_GIT_SHA = "4cbdcb9931ca743a915a7c5fda51b2ee793ed157" # Aug 22, 2019
UDPA_SHA256 = "6291d0c0e3a4d5f08057ea7a00ed0b0ec3dd4e5a3b1cf20f803774680b5a806f"

ZIPKINAPI_RELEASE = "0.2.2" # Aug 23, 2019
ZIPKINAPI_SHA256 = "688c4fe170821dd589f36ec45aaadc03a618a40283bc1f97da8fa11686fc816b"

REPOSITORY_LOCATIONS = dict(
bazel_skylib = dict(
sha256 = BAZEL_SKYLIB_SHA256,
Expand Down Expand Up @@ -62,4 +65,9 @@ REPOSITORY_LOCATIONS = dict(
strip_prefix = "kafka-2.2.0-rc2/clients/src/main/resources/common/message",
urls = ["https://github.com/apache/kafka/archive/2.2.0-rc2.zip"],
),
com_github_openzipkin_zipkinapi = dict(
sha256 = ZIPKINAPI_SHA256,
strip_prefix = "zipkin-api-" + ZIPKINAPI_RELEASE,
urls = ["https://github.com/openzipkin/zipkin-api/archive/" + ZIPKINAPI_RELEASE + ".tar.gz"],
),
)
28 changes: 27 additions & 1 deletion api/envoy/config/trace/v2/trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ message LightstepConfig {
string access_token_file = 2 [(validate.rules).string.min_bytes = 1];
}

// Configuration for the Zipkin tracer.
message ZipkinConfig {
// The cluster manager cluster that hosts the Zipkin collectors. Note that the
// Zipkin cluster must be defined in the :ref:`Bootstrap static cluster
Expand All @@ -80,9 +81,34 @@ message ZipkinConfig {
// trace instance. The default value is false, which will result in a 64 bit trace id being used.
bool trace_id_128bit = 3;

// Determines whether client and server spans will shared the same span id.
// Determines whether client and server spans will share the same span context.
// The default value is true.
google.protobuf.BoolValue shared_span_context = 4;

// Available Zipkin collector endpoint versions.
enum CollectorEndpointVersion {
// Zipkin API v1, JSON over HTTP.
// [#comment: The default implementation of Zipkin client before this field is added was only v1
// and the way user configure this was by not explicitly specifying the version. Consequently,
// before this is added, the corresponding Zipkin collector expected to receive v1 payload.
// Hence the motivation of adding HTTP_JSON_V1 as the default is to avoid a breaking change when
// user upgrading Envoy with this change. Furthermore, we also immediately deprecate this field,
// since in Zipkin realm this v1 version is considered to be not preferable anymore.]
HTTP_JSON_V1 = 0 [deprecated = true];

// Zipkin API v2, JSON over HTTP.
HTTP_JSON = 1;

// Zipkin API v2, protobuf over HTTP.
HTTP_PROTO = 2;

// [#not-implemented-hide:]
GRPC = 3;
}

// Determines the selected collector endpoint version. By default, the ``HTTP_JSON_V1`` will be
// used.
CollectorEndpointVersion collector_endpoint_version = 5;
}

// DynamicOtConfig is used to dynamically load a tracer from a shared library
Expand Down
3 changes: 3 additions & 0 deletions docs/root/intro/deprecated.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ Version 1.12.0 (pending)
and `present_match` fields.
* The :option:`--allow-unknown-fields` command-line option,
use :option:`--allow-unknown-static-fields` instead.
* The use of HTTP_JSON_V1 :ref:`Zipkin collector endpoint version
<envoy_api_field_config.trace.v2.ZipkinConfig.collector_endpoint_version>` or not explicitly
specifying it is deprecated, use HTTP_JSON or HTTP_PROTO instead.

Version 1.11.0 (July 11, 2019)
==============================
Expand Down
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Version history
* router: added :ref:`rq_retry_skipped_request_not_complete <config_http_filters_router_stats>` counter stat to router stats.
* router check tool: add coverage reporting & enforcement.
* router check tool: add comprehensive coverage reporting.
* tracing: added support to the Zipkin reporter for sending list of spans as Zipkin JSON v2 and protobuf message over HTTP.
* router check tool: add deprecated field check.
* tls: added verification of IP address SAN fields in certificates against configured SANs in the
certificate validation context.
Expand Down
1 change: 1 addition & 0 deletions source/common/http/headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class HeaderValues {
const std::string GrpcWebText{"application/grpc-web-text"};
const std::string GrpcWebTextProto{"application/grpc-web-text+proto"};
const std::string Json{"application/json"};
const std::string Protobuf{"application/x-protobuf"};
const std::string FormUrlEncoded{"application/x-www-form-urlencoded"};
} ContentTypeValues;

Expand Down
1 change: 1 addition & 0 deletions source/extensions/tracers/zipkin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ envoy_cc_library(
"//source/common/singleton:const_singleton",
"//source/common/tracing:http_tracer_lib",
"//source/extensions/tracers:well_known_names",
"@com_github_openzipkin_zipkinapi//:zipkin_cc",
],
)

Expand Down
217 changes: 203 additions & 14 deletions source/extensions/tracers/zipkin/span_buffer.cc
Original file line number Diff line number Diff line change
@@ -1,35 +1,224 @@
#include "extensions/tracers/zipkin/span_buffer.h"

#include "common/protobuf/protobuf.h"

#include "extensions/tracers/zipkin/util.h"
#include "extensions/tracers/zipkin/zipkin_core_constants.h"

#include "absl/strings/str_join.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace Zipkin {

// TODO(fabolive): Need to avoid the copy to improve performance.
bool SpanBuffer::addSpan(const Span& span) {
if (span_buffer_.size() == span_buffer_.capacity()) {
// Buffer full
SpanBuffer::SpanBuffer(
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version,
const bool shared_span_context)
: serializer_{makeSerializer(version, shared_span_context)} {}

SpanBuffer::SpanBuffer(
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version,
const bool shared_span_context, uint64_t size)
: serializer_{makeSerializer(version, shared_span_context)} {
allocateBuffer(size);
}

bool SpanBuffer::addSpan(Span&& span) {
const auto& annotations = span.annotations();
if (span_buffer_.size() == span_buffer_.capacity() || annotations.empty() ||
annotations.end() ==
std::find_if(annotations.begin(), annotations.end(), [](const auto& annotation) {
return annotation.value() == ZipkinCoreConstants::get().CLIENT_SEND ||
annotation.value() == ZipkinCoreConstants::get().SERVER_RECV;
})) {

// Buffer full or invalid span.
return false;
}

span_buffer_.push_back(std::move(span));

return true;
}

std::string SpanBuffer::toStringifiedJsonArray() {
std::string stringified_json_array = "[";
SerializerPtr SpanBuffer::makeSerializer(
const envoy::config::trace::v2::ZipkinConfig::CollectorEndpointVersion& version,
const bool shared_span_context) {
switch (version) {
case envoy::config::trace::v2::ZipkinConfig::HTTP_JSON_V1:
return std::make_unique<JsonV1Serializer>();
case envoy::config::trace::v2::ZipkinConfig::HTTP_JSON:
return std::make_unique<JsonV2Serializer>(shared_span_context);
case envoy::config::trace::v2::ZipkinConfig::HTTP_PROTO:
return std::make_unique<ProtobufSerializer>(shared_span_context);
default:
NOT_REACHED_GCOVR_EXCL_LINE;
}
}

std::string JsonV1Serializer::serialize(const std::vector<Span>& zipkin_spans) {
const std::string serialized_elements =
absl::StrJoin(zipkin_spans, ",", [](std::string* element, Span zipkin_span) {
absl::StrAppend(element, zipkin_span.toJson());
});
return absl::StrCat("[", serialized_elements, "]");
}

JsonV2Serializer::JsonV2Serializer(const bool shared_span_context)
: shared_span_context_{shared_span_context} {}

std::string JsonV2Serializer::serialize(const std::vector<Span>& zipkin_spans) {
const std::string serialized_elements =
absl::StrJoin(zipkin_spans, ",", [this](std::string* out, const Span& zipkin_span) {
absl::StrAppend(out,
absl::StrJoin(toListOfSpans(zipkin_span), ",",
[](std::string* element, const zipkin::jsonv2::Span& span) {
std::string entry;
Protobuf::util::MessageToJsonString(span, &entry);
absl::StrAppend(element, entry);
}));
});
return absl::StrCat("[", serialized_elements, "]");
}

if (pendingSpans()) {
stringified_json_array += span_buffer_[0].toJson();
const uint64_t size = span_buffer_.size();
for (uint64_t i = 1; i < size; i++) {
stringified_json_array += ",";
stringified_json_array += span_buffer_[i].toJson();
const std::vector<zipkin::jsonv2::Span>
JsonV2Serializer::toListOfSpans(const Span& zipkin_span) const {
std::vector<zipkin::jsonv2::Span> spans;
spans.reserve(zipkin_span.annotations().size());
for (const auto& annotation : zipkin_span.annotations()) {
zipkin::jsonv2::Span span;

if (annotation.value() == ZipkinCoreConstants::get().CLIENT_SEND) {
span.set_kind(ZipkinCoreConstants::get().KIND_CLIENT);
} else if (annotation.value() == ZipkinCoreConstants::get().SERVER_RECV) {
span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1);
span.set_kind(ZipkinCoreConstants::get().KIND_SERVER);
} else {
continue;
}

if (annotation.isSetEndpoint()) {
span.set_timestamp(annotation.timestamp());
span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint()));
}

span.set_trace_id(zipkin_span.traceIdAsHexString());
if (zipkin_span.isSetParentId()) {
span.set_parent_id(zipkin_span.parentIdAsHexString());
}

span.set_id(zipkin_span.idAsHexString());
span.set_name(zipkin_span.name());

if (zipkin_span.isSetDuration()) {
span.set_duration(zipkin_span.duration());
}

auto& tags = *span.mutable_tags();
for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) {
tags[binary_annotation.key()] = binary_annotation.value();
}

spans.push_back(std::move(span));
}
return spans;
}

const zipkin::jsonv2::Endpoint
JsonV2Serializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
zipkin::jsonv2::Endpoint endpoint;
Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
if (address) {
if (address->ip()->version() == Network::Address::IpVersion::v4) {
endpoint.set_ipv4(address->ip()->addressAsString());
} else {
endpoint.set_ipv6(address->ip()->addressAsString());
}
endpoint.set_port(address->ip()->port());
}

const std::string& service_name = zipkin_endpoint.serviceName();
if (!service_name.empty()) {
endpoint.set_service_name(service_name);
}

return endpoint;
}

ProtobufSerializer::ProtobufSerializer(const bool shared_span_context)
: shared_span_context_{shared_span_context} {}

std::string ProtobufSerializer::serialize(const std::vector<Span>& zipkin_spans) {
zipkin::proto3::ListOfSpans spans;
for (const Span& zipkin_span : zipkin_spans) {
spans.MergeFrom(toListOfSpans(zipkin_span));
}
std::string serialized;
spans.SerializeToString(&serialized);
return serialized;
}

const zipkin::proto3::ListOfSpans ProtobufSerializer::toListOfSpans(const Span& zipkin_span) const {
zipkin::proto3::ListOfSpans spans;
for (const auto& annotation : zipkin_span.annotations()) {
zipkin::proto3::Span span;
if (annotation.value() == ZipkinCoreConstants::get().CLIENT_SEND) {
span.set_kind(zipkin::proto3::Span::CLIENT);
} else if (annotation.value() == ZipkinCoreConstants::get().SERVER_RECV) {
span.set_shared(shared_span_context_ && zipkin_span.annotations().size() > 1);
span.set_kind(zipkin::proto3::Span::SERVER);
} else {
continue;
}

if (annotation.isSetEndpoint()) {
span.set_timestamp(annotation.timestamp());
span.mutable_local_endpoint()->MergeFrom(toProtoEndpoint(annotation.endpoint()));
}

span.set_trace_id(zipkin_span.traceIdAsByteString());
if (zipkin_span.isSetParentId()) {
span.set_parent_id(zipkin_span.parentIdAsByteString());
}

span.set_id(zipkin_span.idAsByteString());
span.set_name(zipkin_span.name());

if (zipkin_span.isSetDuration()) {
span.set_duration(zipkin_span.duration());
}

auto& tags = *span.mutable_tags();
for (const auto& binary_annotation : zipkin_span.binaryAnnotations()) {
tags[binary_annotation.key()] = binary_annotation.value();
}

auto* mutable_span = spans.add_spans();
mutable_span->MergeFrom(span);
}
return spans;
}

const zipkin::proto3::Endpoint
ProtobufSerializer::toProtoEndpoint(const Endpoint& zipkin_endpoint) const {
zipkin::proto3::Endpoint endpoint;
Network::Address::InstanceConstSharedPtr address = zipkin_endpoint.address();
if (address) {
if (address->ip()->version() == Network::Address::IpVersion::v4) {
endpoint.set_ipv4(Util::toByteString(address->ip()->ipv4()->address()));
} else {
endpoint.set_ipv6(Util::toByteString(address->ip()->ipv6()->address()));
}
endpoint.set_port(address->ip()->port());
}

const std::string& service_name = zipkin_endpoint.serviceName();
if (!service_name.empty()) {
endpoint.set_service_name(service_name);
}
stringified_json_array += "]";

return stringified_json_array;
return endpoint;
}

} // namespace Zipkin
Expand Down
Loading

0 comments on commit 6c6e18e

Please sign in to comment.