Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OTLP exporter #152

Merged
merged 13 commits into from
Jul 8, 2020
3 changes: 3 additions & 0 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# bazel configurations for running tests under sanitizers.
# Based on https://github.com/bazelment/trunk/blob/master/tools/bazel.rc

# Needed by gRPC to build on some platforms.
build --copt -DGRPC_BAZEL_BUILD

# --config=asan : Address Sanitizer.
common:asan --copt -fsanitize=address
common:asan --copt -DADDRESS_SANITIZER
Expand Down
24 changes: 24 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,30 @@ workspace(name = "io_opentelemetry_cpp")

load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

# Load gRPC dependency
# Note that this dependency needs to be loaded first due to
# https://github.com/bazelbuild/bazel/issues/6664
http_archive(
name = "com_github_grpc_grpc",
strip_prefix = "grpc-1.28.0",
urls = [
"https://github.com/grpc/grpc/archive/v1.28.0.tar.gz",
],
)

load("@com_github_grpc_grpc//bazel:grpc_deps.bzl", "grpc_deps")

grpc_deps()

# Load extra gRPC dependencies due to https://github.com/grpc/grpc/issues/20511
load("@com_github_grpc_grpc//bazel:grpc_extra_deps.bzl", "grpc_extra_deps")

grpc_extra_deps()

load("@upb//bazel:repository_defs.bzl", "bazel_version_repository")

bazel_version_repository(name = "upb_bazel_version")

# Uses older protobuf version because of
# https://github.com/protocolbuffers/protobuf/issues/7179
http_archive(
Expand Down
24 changes: 24 additions & 0 deletions bazel/opentelemetry_proto.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package(default_visibility = ["//visibility:public"])

load("@rules_proto//proto:defs.bzl", "proto_library")
load("@com_github_grpc_grpc//bazel:cc_grpc_library.bzl", "cc_grpc_library")

proto_library(
name = "common_proto",
Expand Down Expand Up @@ -58,3 +59,26 @@ cc_proto_library(
name = "trace_proto_cc",
deps = [":trace_proto"],
)

proto_library(
name = "trace_service_proto",
srcs = [
"opentelemetry/proto/collector/trace/v1/trace_service.proto",
],
deps = [
":trace_proto",
],
)

cc_proto_library(
name = "trace_service_proto_cc",
deps = [":trace_service_proto"],
)

cc_grpc_library(
name = "trace_service_grpc_cc",
srcs = [":trace_service_proto"],
grpc_only = True,
deps = [":trace_service_proto_cc"],
generate_mocks = True,
)
30 changes: 30 additions & 0 deletions exporters/otlp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ cc_library(
],
)

cc_library(
name = "otlp_exporter",
srcs = [
'otlp_exporter.cc',
],
hdrs = [
'otlp_exporter.h',
],
deps = [
":recordable",
"//sdk/src/trace",
"@com_github_opentelemetry_proto//:trace_proto_cc",
"@com_github_opentelemetry_proto//:trace_service_proto_cc",

# For gRPC
"@com_github_opentelemetry_proto//:trace_service_grpc_cc",
"@com_github_grpc_grpc//:grpc++",
],
)

cc_test(
name = "recordable_test",
srcs = ["recordable_test.cc"],
Expand All @@ -37,3 +57,13 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "otlp_exporter_test",
srcs = ["otlp_exporter_test.cc"],
deps = [
":otlp_exporter",
"//api",
"@com_google_googletest//:gtest_main",
],
)
77 changes: 77 additions & 0 deletions exporters/otlp/otlp_exporter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include "otlp_exporter.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace otlp
{

const std::string kCollectorAddress = "localhost:55678";

// ----------------------------- Helper functions ------------------------------

/**
* Add span protobufs contained in recordables to request.
* @param spans the spans to export
* @param request the current request
*/
void PopulateRequest(const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans,
proto::collector::trace::v1::ExportTraceServiceRequest *request)
{
auto resource_span = request->add_resource_spans();
auto instrumentation_lib = resource_span->add_instrumentation_library_spans();

for (auto &recordable : spans)
{
auto rec = std::unique_ptr<Recordable>(static_cast<Recordable *>(recordable.release()));
*instrumentation_lib->add_spans() = std::move(rec->span());
}
}

/**
* Create service stub to communicate with the OpenTelemetry Collector.
*/
std::unique_ptr<proto::collector::trace::v1::TraceService::Stub> MakeServiceStub()
{
auto channel = grpc::CreateChannel(kCollectorAddress, grpc::InsecureChannelCredentials());
return proto::collector::trace::v1::TraceService::NewStub(channel);
}

// -------------------------------- Contructors --------------------------------

OtlpExporter::OtlpExporter() : OtlpExporter(MakeServiceStub()) {}

OtlpExporter::OtlpExporter(
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub)
: trace_service_stub_(std::move(stub))
{}

// ----------------------------- Exporter methods ------------------------------

std::unique_ptr<sdk::trace::Recordable> OtlpExporter::MakeRecordable() noexcept
{
return std::unique_ptr<sdk::trace::Recordable>(new Recordable);
}

sdk::trace::ExportResult OtlpExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
proto::collector::trace::v1::ExportTraceServiceRequest request;

PopulateRequest(spans, &request);

grpc::ClientContext context;
proto::collector::trace::v1::ExportTraceServiceResponse response;

grpc::Status status = trace_service_stub_->Export(&context, request, &response);

if (!status.ok())
{
std::cerr << "OTLP trace exporter: Export() failed\n";
nadiaciobanu marked this conversation as resolved.
Show resolved Hide resolved
return sdk::trace::ExportResult::kFailure;
}
return sdk::trace::ExportResult::kSuccess;
}
} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
65 changes: 65 additions & 0 deletions exporters/otlp/otlp_exporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#pragma once

#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"
#include "opentelemetry/proto/trace/v1/trace.pb.h"
#include "recordable.h"

#include <grpcpp/grpcpp.h>
#include <iostream>
nadiaciobanu marked this conversation as resolved.
Show resolved Hide resolved

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace otlp
{
/**
* The OTLP exporter exports span data in OpenTelemetry Protocol (OTLP) format.
*/
class OtlpExporter final : public opentelemetry::sdk::trace::SpanExporter
nadiaciobanu marked this conversation as resolved.
Show resolved Hide resolved
{
public:
/**
* Create an OtlpExporter. This constructor initializes a service stub to be
* used for exporting.
*/
OtlpExporter();
nadiaciobanu marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a span recordable.
* @return a newly initialized Recordable object
*/
std::unique_ptr<sdk::trace::Recordable> MakeRecordable() noexcept override;

/**
* Export a batch of span recordables in OTLP format.
* @param spans a span of unique pointers to span recordables
*/
sdk::trace::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept override;

/**
* Shut down the exporter.
* @param timeout an optional timeout, the default timeout of 0 means that no
* timeout is applied.
*/
void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override {};

private:
// For testing
friend class OtlpExporterTestPeer;

// Store service stub internally. Useful for testing.
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> trace_service_stub_;

/**
* Create an OtlpExporter using the specified service stub.
* Only tests can call this constructor directly.
* @param stub the service stub to be used for exporting
*/
OtlpExporter(std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub);
};
} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
81 changes: 81 additions & 0 deletions exporters/otlp/otlp_exporter_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include "otlp_exporter.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service_mock.grpc.pb.h"
#include "opentelemetry/sdk/trace/simple_processor.h"
#include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/provider.h"

#include <gtest/gtest.h>

using namespace testing;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
{
namespace otlp
{

class OtlpExporterTestPeer : public ::testing::Test
{
public:
std::unique_ptr<sdk::trace::SpanExporter> GetExporter(
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> &stub_interface)
{
return std::unique_ptr<sdk::trace::SpanExporter>(new OtlpExporter(std::move(stub_interface)));
}
};

// Call Export() directly
TEST_F(OtlpExporterTestPeer, ExportUnitTest)
{
auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub_interface(
mock_stub);
auto exporter = GetExporter(stub_interface);

auto recordable_1 = exporter->MakeRecordable();
recordable_1->SetName("Test span 1");
auto recordable_2 = exporter->MakeRecordable();
recordable_2->SetName("Test span 2");

// Test successful RPC
nostd::span<std::unique_ptr<sdk::trace::Recordable>> batch_1(&recordable_1, 1);
EXPECT_CALL(*mock_stub, Export(_, _, _)).Times(Exactly(1)).WillOnce(Return(grpc::Status::OK));
auto result = exporter->Export(batch_1);
EXPECT_EQ(sdk::trace::ExportResult::kSuccess, result);

// Test failed RPC
nostd::span<std::unique_ptr<sdk::trace::Recordable>> batch_2(&recordable_2, 1);
EXPECT_CALL(*mock_stub, Export(_, _, _))
.Times(Exactly(1))
.WillOnce(Return(grpc::Status::CANCELLED));
result = exporter->Export(batch_2);
EXPECT_EQ(sdk::trace::ExportResult::kFailure, result);
}

// Create spans, let processor call Export()
TEST_F(OtlpExporterTestPeer, ExportIntegrationTest)
{
auto mock_stub = new proto::collector::trace::v1::MockTraceServiceStub();
std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub_interface(
mock_stub);

auto exporter = GetExporter(stub_interface);

auto processor = std::shared_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::SimpleSpanProcessor(std::move(exporter)));
auto provider =
nostd::shared_ptr<trace::TracerProvider>(new sdk::trace::TracerProvider(processor));
auto tracer = provider->GetTracer("test");

EXPECT_CALL(*mock_stub, Export(_, _, _))
.Times(AtLeast(1))
.WillRepeatedly(Return(grpc::Status::OK));

auto parent_span = tracer->StartSpan("Test parent span");
auto child_span = tracer->StartSpan("Test child span");
child_span->End();
parent_span->End();
}
} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE