Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(pubsub): add thread id attribute to publish span #13151

Merged
merged 10 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions google/cloud/internal/opentelemetry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/span_startoptions.h>
#include <sstream>
#include <string>
#include <thread>
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

namespace google {
Expand Down Expand Up @@ -136,6 +139,12 @@ std::string ToString(opentelemetry::trace::SpanId const& span_id) {
return std::string(span_id_array, kSize);
}

std::string CurrentThreadId() {
std::ostringstream os;
os << std::this_thread::get_id();
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
return std::move(os).str();
}

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/internal/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <opentelemetry/nostd/string_view.h>
#include <opentelemetry/trace/span.h>
#include <opentelemetry/trace/tracer.h>
#include <string>
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
#include <chrono>
#include <functional>
Expand Down Expand Up @@ -216,6 +217,9 @@ std::string ToString(opentelemetry::trace::TraceId const& trace_id);

std::string ToString(opentelemetry::trace::SpanId const& span_id);

/// Gets the current thread id.
std::string CurrentThreadId();
alevenberg marked this conversation as resolved.
Show resolved Hide resolved

#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

bool TracingEnabled(Options const& options);
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/pubsub/internal/tracing_message_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ auto MakeParent(Links const& links, Spans const& message_spans) {
static_cast<std::int64_t>(message_spans.size())},
{sc::kCodeFunction, "BatchSink::AsyncPublish"},
{/*sc::kMessagingOperation=*/
"messaging.operation", "publish"}},
"messaging.operation", "publish"},
{sc::kThreadId, internal::CurrentThreadId()}},
/*links*/ std::move(links), options);

// Add metadata to the message spans about the batch sink span.
Expand Down
89 changes: 48 additions & 41 deletions google/cloud/pubsub/internal/tracing_message_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,43 @@ TEST(TracingMessageBatch, Flush) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 1),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 1)),
SpanHasLinks(AllOf(LinkHasSpanContext(message_span->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0)))))));
}

TEST(TracingMessageBatch, PublishSpanHasAttributes) {
namespace sc = ::opentelemetry::trace::SemanticConventions;
auto span_catcher = InstallSpanCatcher();
auto message_span = MakeSpan("test span");
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, SaveMessage(_));
EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; });
auto message_batch =
MakeTracingMessageBatch(std::move(mock), MakeTestOptions());
auto initial_spans = {message_span};
SaveMessages(initial_spans, message_batch);

auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kThreadId, _)))));
EXPECT_THAT(spans, Contains(AllOf(

SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish")))));
EXPECT_THAT(spans,
Contains(AllOf(SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::string>(
sc::kMessagingOperation, "publish")))));
}

TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) {
namespace sc = ::opentelemetry::trace::SemanticConventions;
// Create span before the span catcher so it is not sampled.
Expand Down Expand Up @@ -178,10 +206,7 @@ TEST(TracingMessageBatch, FlushOnlyIncludeSampledLink) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2)),
SpanLinksAre(AllOf(LinkHasSpanContext(message_span->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0)))))));
Expand Down Expand Up @@ -213,10 +238,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) {
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount, 2)),
SpanHasLinks(AllOf(LinkHasSpanContext(message_span1->GetContext()),
SpanLinkAttributesAre(OTelAttribute<int64_t>(
"messaging.gcp_pubsub.message.link", 0))),
Expand Down Expand Up @@ -244,16 +266,11 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) {
auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(sc::kMessagingBatchMessageCount,
kDefaultMaxLinks),
OTelAttribute<std::string>(sc::kCodeFunction,
"BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation, "publish")),
SpanLinksSizeIs(128))));
Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kDefaultMaxLinks)),
SpanLinksSizeIs(128))));
}

TEST(TracingMessageBatch, FlushLargeBatch) {
Expand All @@ -275,16 +292,10 @@ TEST(TracingMessageBatch, FlushLargeBatch) {
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, batch_size),
OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish"),
OTelAttribute<std::string>(sc::kMessagingOperation,
"publish")))));
EXPECT_THAT(spans, Contains(AllOf(
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, batch_size)))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(),
SpanLinksSizeIs(kDefaultMaxLinks))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(),
Expand All @@ -311,15 +322,11 @@ TEST(TracingMessageBatch, FlushBatchWithCustomLimit) {
end_spans(make_ready_future());

auto spans = span_catcher->GetSpans();
EXPECT_THAT(
spans,
Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(
OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kBatchSize),
OTelAttribute<std::string>(
sc::kCodeFunction, "BatchSink::AsyncPublish")))));
EXPECT_THAT(spans, Contains(AllOf(
SpanHasInstrumentationScope(), SpanKindIsProducer(),
SpanNamed("publish"),
SpanHasAttributes(OTelAttribute<std::int64_t>(
sc::kMessagingBatchMessageCount, kBatchSize)))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #0"), SpanKindIsClient(),
SpanLinksSizeIs(kMaxLinks))));
EXPECT_THAT(spans, Contains(AllOf(SpanNamed("publish #1"), SpanKindIsClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
#include "google/cloud/internal/opentelemetry.h"
#include <opentelemetry/trace/semantic_conventions.h>
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

namespace google {
Expand All @@ -31,12 +29,6 @@ namespace {

namespace sc = ::opentelemetry::trace::SemanticConventions;

std::string CurrentThreadId() {
std::ostringstream os;
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
os << std::this_thread::get_id();
return std::move(os).str();
}

class AsyncReaderConnectionTracing
: public storage_experimental::AsyncReaderConnection {
public:
Expand All @@ -47,9 +39,10 @@ class AsyncReaderConnectionTracing

void Cancel() override {
auto scope = opentelemetry::trace::Scope(span_);
span_->AddEvent("gl-cpp.cancel", {
{sc::kThreadId, CurrentThreadId()},
});
span_->AddEvent("gl-cpp.cancel",
{
{sc::kThreadId, internal::CurrentThreadId()},
});
return impl_->Cancel();
}

Expand All @@ -63,7 +56,7 @@ class AsyncReaderConnectionTracing
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
});
return internal::EndSpan(*span, absl::get<Status>(std::move(r)));
}
Expand All @@ -72,7 +65,7 @@ class AsyncReaderConnectionTracing
{
{sc::kMessageType, "RECEIVED"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
{"message.starting_offset", payload.offset()},
});
return r;
Expand Down
54 changes: 25 additions & 29 deletions google/cloud/storage/internal/async/writer_connection_tracing.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#include "google/cloud/internal/opentelemetry.h"
#include <opentelemetry/trace/semantic_conventions.h>
#include <cstdint>
#include <sstream>
#include <string>
#include <thread>
#include <utility>

namespace google {
Expand All @@ -30,12 +28,6 @@ namespace {

namespace sc = ::opentelemetry::trace::SemanticConventions;

std::string CurrentThreadId() {
std::ostringstream os;
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
os << std::this_thread::get_id();
return std::move(os).str();
}

class AsyncWriterConnectionTracing
: public storage_experimental::AsyncWriterConnection {
public:
Expand All @@ -46,9 +38,10 @@ class AsyncWriterConnectionTracing

void Cancel() override {
auto scope = opentelemetry::trace::Scope(span_);
span_->AddEvent("gl-cpp.cancel", {
{sc::kThreadId, CurrentThreadId()},
});
span_->AddEvent("gl-cpp.cancel",
{
{sc::kThreadId, internal::CurrentThreadId()},
});
return impl_->Cancel();
}

Expand All @@ -68,12 +61,13 @@ class AsyncWriterConnectionTracing
auto size = static_cast<std::uint64_t>(p.size());
return impl_->Write(std::move(p))
.then([count = ++sent_count_, span = span_, size](auto f) {
span->AddEvent("gl-cpp.write", {
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{"gl-cpp.size", size},
});
span->AddEvent("gl-cpp.write",
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
auto status = f.get();
if (!status.ok()) return internal::EndSpan(*span, std::move(status));
return status;
Expand All @@ -90,7 +84,7 @@ class AsyncWriterConnectionTracing
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
return internal::EndSpan(*span, f.get());
Expand All @@ -102,12 +96,13 @@ class AsyncWriterConnectionTracing
auto size = static_cast<std::uint64_t>(p.size());
return impl_->Flush(std::move(p))
.then([count = ++sent_count_, span = span_, size](auto f) {
span->AddEvent("gl-cpp.flush", {
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
{"gl-cpp.size", size},
});
span->AddEvent("gl-cpp.flush",
{
{sc::kMessageType, "SENT"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
{"gl-cpp.size", size},
});
auto status = f.get();
if (!status.ok()) return internal::EndSpan(*span, std::move(status));
return status;
Expand All @@ -117,11 +112,12 @@ class AsyncWriterConnectionTracing
future<StatusOr<std::int64_t>> Query() override {
internal::OTelScope scope(span_);
return impl_->Query().then([count = ++recv_count_, span = span_](auto f) {
span->AddEvent("gl-cpp.query", {
{sc::kMessageType, "RECEIVE"},
{sc::kMessageId, count},
{sc::kThreadId, CurrentThreadId()},
});
span->AddEvent("gl-cpp.query",
{
{sc::kMessageType, "RECEIVE"},
{sc::kMessageId, count},
{sc::kThreadId, internal::CurrentThreadId()},
});
auto response = f.get();
if (!response) return internal::EndSpan(*span, std::move(response));
return response;
Expand Down