Skip to content

Implemented batching in wilson uploader #1955

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 167 additions & 50 deletions ydb/library/actors/wilson/wilson_uploader.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "wilson_uploader.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
Expand All @@ -7,7 +8,9 @@
#include <library/cpp/string_utils/url/url.h>
#include <util/stream/file.h>
#include <util/string/hex.h>

#include <chrono>
#include <queue>

namespace NWilson {

Expand All @@ -18,11 +21,83 @@ namespace NWilson {

namespace {

struct TSpan {
TMonotonic ExpirationTimestamp;
NTraceProto::Span Span;
size_t Size;
};

class TBatch {
private:
ui64 MaxSpansInBatch;
ui64 MaxBytesInBatch;

NServiceProto::ExportTraceServiceRequest Request;
NTraceProto::ScopeSpans* ScopeSpans;
ui64 SizeBytes = 0;
TMonotonic ExpirationTimestamp = TMonotonic::Zero();

public:
struct TData {
NServiceProto::ExportTraceServiceRequest Request;
ui64 SizeBytes;
ui64 SizeSpans;
TMonotonic ExpirationTimestamp;
};

TBatch(ui64 maxSpansInBatch, ui64 maxBytesInBatch, TString serviceName)
: MaxSpansInBatch(maxSpansInBatch)
, MaxBytesInBatch(maxBytesInBatch)
{
auto *rspan = Request.add_resource_spans();
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
serviceNameAttr->set_key("service.name");
serviceNameAttr->mutable_value()->set_string_value(std::move(serviceName));
ScopeSpans = rspan->add_scope_spans();
}

size_t SizeSpans() const {
return ScopeSpans->spansSize();
}

bool IsEmpty() const {
return SizeSpans() == 0;
}

bool Add(TSpan& span) {
if (SizeBytes + span.Size > MaxBytesInBatch || SizeSpans() == MaxSpansInBatch) {
return false;
}
SizeBytes += span.Size;
span.Span.Swap(ScopeSpans->add_spans());
ExpirationTimestamp = span.ExpirationTimestamp;
return true;
}

TData Complete() && {
return TData {
.Request = std::move(Request),
.SizeBytes = SizeBytes,
.SizeSpans = SizeSpans(),
.ExpirationTimestamp = ExpirationTimestamp,
};
}
};

class TWilsonUploader
: public TActorBootstrapped<TWilsonUploader>
{
static constexpr size_t WILSON_SERVICE_ID = 430;

ui64 MaxPendingSpanBytes = 100'000'000;
ui64 MaxSpansInBatch = 150;
ui64 MaxBytesInBatch = 20'000'000;
TDuration MaxBatchAccumulation = TDuration::Seconds(1);
ui32 MaxSpansPerSecond = 10;
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);

bool WakeupScheduled = false;

TString CollectorUrl;
TString ServiceName;

Expand All @@ -36,26 +111,20 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;

struct TSpanQueueItem {
TMonotonic ExpirationTimestamp;
NTraceProto::Span Span;
ui32 Size;
};

std::deque<TSpanQueueItem> Spans;
ui64 SpansSize = 0;
TBatch CurrentBatch;
std::queue<TBatch::TData> BatchQueue;
ui64 SpansSizeBytes = 0;
TMonotonic NextSendTimestamp;
ui32 MaxSpansAtOnce = 25;
ui32 MaxSpansPerSecond = 10;
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);

bool WakeupScheduled = false;
bool BatchCompletionScheduled = false;
TMonotonic NextBatchCompletion;

public:
TWilsonUploader(WilsonUploaderParams params)
: CollectorUrl(std::move(params.CollectorUrl))
, ServiceName(std::move(params.ServiceName))
, GrpcSigner(std::move(params.GrpcSigner))
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
{}

~TWilsonUploader() {
Expand Down Expand Up @@ -87,28 +156,69 @@ namespace NWilson {
}

void Handle(TEvWilson::TPtr ev) {
if (SpansSize >= 100'000'000) {
if (SpansSizeBytes >= MaxPendingSpanBytes) {
LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow");
} else {
const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
const TMonotonic now = TActivationContext::Monotonic();
const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue;
auto& span = ev->Get()->Span;
const ui32 size = span.ByteSizeLong();
Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
SpansSize += size;
if (size > MaxBytesInBatch) {
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch);
return;
}
TSpan spanItem {
.ExpirationTimestamp = expirationTimestamp,
.Span = std::move(span),
.Size = size,
};
SpansSizeBytes += size;
if (CurrentBatch.IsEmpty()) {
ScheduleBatchCompletion(now);
}
if (CurrentBatch.Add(spanItem)) {
return;
}
CompleteCurrentBatch();
TryMakeProgress();
Y_ABORT_UNLESS(CurrentBatch.Add(spanItem), "failed to add span to empty batch");
ScheduleBatchCompletion(now);
}
}

void ScheduleBatchCompletionEvent() {
Y_ABORT_UNLESS(!BatchCompletionScheduled);
auto cookie = NextBatchCompletion.GetValue();
TActivationContext::Schedule(NextBatchCompletion, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, cookie));
ALOG_TRACE(WILSON_SERVICE_ID, "scheduling batch completion w/ cookie=" << cookie);
BatchCompletionScheduled = true;
}

void ScheduleBatchCompletion(TMonotonic now) {
NextBatchCompletion = now + MaxBatchAccumulation;
if (!BatchCompletionScheduled) {
ScheduleBatchCompletionEvent();
}
}

void CompleteCurrentBatch() {
if (CurrentBatch.IsEmpty()) {
return;
}
BatchQueue.push(std::move(CurrentBatch).Complete());
CurrentBatch = TBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName);
}

void TryToSend() {
const TMonotonic now = TActivationContext::Monotonic();

ui32 numSpansDropped = 0;
while (!Spans.empty()) {
const TSpanQueueItem& item = Spans.front();
while (!BatchQueue.empty()) {
const TBatch::TData& item = BatchQueue.front();
if (item.ExpirationTimestamp <= now) {
SpansSize -= item.Size;
Spans.pop_front();
++numSpansDropped;
SpansSizeBytes -= item.SizeBytes;
numSpansDropped += item.SizeSpans;
BatchQueue.pop();
} else {
break;
}
Expand All @@ -119,42 +229,36 @@ namespace NWilson {
"dropped " << numSpansDropped << " span(s) due to expiration");
}

if (Context || Spans.empty()) {
if (Context || BatchQueue.empty()) {
return;
} else if (now < NextSendTimestamp) {
ScheduleWakeup(NextSendTimestamp);
return;
}

NServiceProto::ExportTraceServiceRequest request;
auto *rspan = request.add_resource_spans();
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
serviceNameAttr->set_key("service.name");
serviceNameAttr->mutable_value()->set_string_value(ServiceName);
auto *sspan = rspan->add_scope_spans();

NextSendTimestamp = now;
for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
auto& item = Spans.front();
auto& s = item.Span;

LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span"
<< " TraceId# " << HexEncode(s.trace_id())
<< " SpanId# " << HexEncode(s.span_id())
<< " ParentSpanId# " << HexEncode(s.parent_span_id())
<< " Name# " << s.name());

SpansSize -= item.Size;
s.Swap(sspan->add_spans());
NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);

TBatch::TData batch = std::move(BatchQueue.front());
BatchQueue.pop();

ALOG_DEBUG(WILSON_SERVICE_ID, "exporting batch of " << batch.SizeSpans << " spans, total spans size: " << batch.SizeBytes);
Y_ABORT_UNLESS(batch.Request.resource_spansSize() == 1 && batch.Request.resource_spans(0).scope_spansSize() == 1);
for (const auto& span : batch.Request.resource_spans(0).scope_spans(0).spans()) {
ALOG_DEBUG(WILSON_SERVICE_ID, "exporting span"
<< " TraceId# " << HexEncode(span.trace_id())
<< " SpanId# " << HexEncode(span.span_id())
<< " ParentSpanId# " << HexEncode(span.parent_span_id())
<< " Name# " << span.name());
}

NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond);
SpansSizeBytes -= batch.SizeBytes;

ScheduleWakeup(NextSendTimestamp);
Context = std::make_unique<grpc::ClientContext>();
if (GrpcSigner) {
GrpcSigner->SignClientContext(*Context);
}
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
Reader = Stub->AsyncExport(Context.get(), std::move(batch.Request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}

Expand All @@ -179,15 +283,28 @@ namespace NWilson {
template<typename T>
void ScheduleWakeup(T&& deadline) {
if (!WakeupScheduled) {
TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
nullptr, 0));
TActivationContext::Schedule(deadline,
new IEventHandle(TEvents::TSystem::Wakeup, 0,
SelfId(), {}, nullptr, 0));
WakeupScheduled = true;
}
}

void HandleWakeup() {
Y_ABORT_UNLESS(WakeupScheduled);
WakeupScheduled = false;
void HandleWakeup(TEvents::TEvWakeup::TPtr& ev) {
const auto cookie = ev->Cookie;
ALOG_TRACE(WILSON_SERVICE_ID, "wakeup received w/ cookie=" << cookie);
if (cookie == 0) {
Y_ABORT_UNLESS(WakeupScheduled);
WakeupScheduled = false;
} else {
Y_ABORT_UNLESS(BatchCompletionScheduled);
BatchCompletionScheduled = false;
if (cookie == NextBatchCompletion.GetValue()) {
CompleteCurrentBatch();
} else {
ScheduleBatchCompletionEvent();
}
}
TryMakeProgress();
}

Expand All @@ -198,7 +315,7 @@ namespace NWilson {

STRICT_STFUNC(StateWork,
hFunc(TEvWilson, Handle);
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
hFunc(TEvents::TEvWakeup, HandleWakeup);
);

STRICT_STFUNC(StateBroken,
Expand Down