Skip to content

Commit 978f9ba

Browse files
authored
Merge 0dd8f2c into 96288e5
2 parents 96288e5 + 0dd8f2c commit 978f9ba

File tree

1 file changed

+140
-48
lines changed

1 file changed

+140
-48
lines changed

ydb/library/actors/wilson/wilson_uploader.cpp

Lines changed: 140 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "wilson_uploader.h"
2+
23
#include <ydb/library/actors/core/actor_bootstrapped.h>
34
#include <ydb/library/actors/core/hfunc.h>
45
#include <ydb/library/actors/core/log.h>
@@ -7,7 +8,9 @@
78
#include <library/cpp/string_utils/url/url.h>
89
#include <util/stream/file.h>
910
#include <util/string/hex.h>
11+
1012
#include <chrono>
13+
#include <queue>
1114

1215
namespace NWilson {
1316

@@ -18,11 +21,82 @@ namespace NWilson {
1821

1922
namespace {
2023

24+
struct TSpanQueueItem {
25+
TMonotonic ExpirationTimestamp;
26+
NTraceProto::Span Span;
27+
size_t Size;
28+
};
29+
30+
class TBatch {
31+
private:
32+
ui64 MaxSpansInBatch;
33+
ui64 MaxBytesInBatch;
34+
35+
NServiceProto::ExportTraceServiceRequest Request;
36+
NTraceProto::ScopeSpans* ScopeSpans;
37+
ui64 SizeBytes = 0;
38+
TMonotonic ExpirationTimestamp = TMonotonic::Zero();
39+
40+
public:
41+
struct TData {
42+
NServiceProto::ExportTraceServiceRequest Request;
43+
ui64 SizeBytes;
44+
ui64 SizeSpans;
45+
TMonotonic ExpirationTimestamp;
46+
};
47+
48+
TBatch(ui64 maxSpansInBatch, ui64 maxBytesInBatch, TString serviceName)
49+
: MaxSpansInBatch(maxSpansInBatch)
50+
, MaxBytesInBatch(maxBytesInBatch)
51+
{
52+
auto *rspan = Request.add_resource_spans();
53+
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
54+
serviceNameAttr->set_key("service.name");
55+
serviceNameAttr->mutable_value()->set_string_value(std::move(serviceName));
56+
ScopeSpans = rspan->add_scope_spans();
57+
}
58+
59+
size_t SizeSpans() const {
60+
return ScopeSpans->spansSize();
61+
}
62+
63+
bool IsEmpty() const {
64+
return SizeSpans() == 0;
65+
}
66+
67+
bool Add(TSpanQueueItem& span) {
68+
if (SizeBytes + span.Size > MaxBytesInBatch || SizeSpans() == MaxSpansInBatch) {
69+
return false;
70+
}
71+
SizeBytes += span.Size;
72+
span.Span.Swap(ScopeSpans->Addspans());
73+
ExpirationTimestamp = span.ExpirationTimestamp;
74+
return true;
75+
}
76+
77+
TData Complete() && {
78+
return TData {
79+
.Request = std::move(Request),
80+
.SizeBytes = SizeBytes,
81+
.SizeSpans = SizeSpans(),
82+
.ExpirationTimestamp = ExpirationTimestamp,
83+
};
84+
}
85+
};
86+
2187
class TWilsonUploader
2288
: public TActorBootstrapped<TWilsonUploader>
2389
{
2490
static constexpr size_t WILSON_SERVICE_ID = 430;
2591

92+
ui64 MaxSpansInBatch = 150;
93+
ui64 MaxBytesInBatch = 20'000'000;
94+
ui64 MaxBatchAccumulationMilliseconds = 1'000;
95+
ui32 MaxSpansPerSecond = 10;
96+
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);
97+
98+
bool WakeupScheduled = false;
99+
26100
TString CollectorUrl;
27101
TString ServiceName;
28102

@@ -36,26 +110,18 @@ namespace NWilson {
36110
NServiceProto::ExportTraceServiceResponse Response;
37111
grpc::Status Status;
38112

39-
struct TSpanQueueItem {
40-
TMonotonic ExpirationTimestamp;
41-
NTraceProto::Span Span;
42-
ui32 Size;
43-
};
44-
45-
std::deque<TSpanQueueItem> Spans;
113+
TBatch CurrentBatch;
114+
std::queue<TBatch::TData> BatchQueue;
46115
ui64 SpansSize = 0;
47116
TMonotonic NextSendTimestamp;
48-
ui32 MaxSpansAtOnce = 25;
49-
ui32 MaxSpansPerSecond = 10;
50-
TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);
51-
52-
bool WakeupScheduled = false;
117+
ui64 SendBatchId = 1;
53118

54119
public:
55120
TWilsonUploader(WilsonUploaderParams params)
56121
: CollectorUrl(std::move(params.CollectorUrl))
57122
, ServiceName(std::move(params.ServiceName))
58123
, GrpcSigner(std::move(params.GrpcSigner))
124+
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
59125
{}
60126

61127
~TWilsonUploader() {
@@ -90,25 +156,52 @@ namespace NWilson {
90156
if (SpansSize >= 100'000'000) {
91157
LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow");
92158
} else {
93-
const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
159+
const TMonotonic now = TActivationContext::Monotonic();
160+
const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue;
94161
auto& span = ev->Get()->Span;
95162
const ui32 size = span.ByteSizeLong();
96-
Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
163+
if (size > MaxBytesInBatch) {
164+
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch);
165+
return;
166+
}
167+
TSpanQueueItem spanItem{expirationTimestamp, std::move(span), size};
97168
SpansSize += size;
169+
if (CurrentBatch.IsEmpty()) {
170+
ScheduleBatchCompletion(now);
171+
}
172+
if (CurrentBatch.Add(spanItem)) {
173+
return;
174+
}
175+
CompleteCurrentBatch();
98176
TryMakeProgress();
177+
Y_ABORT_UNLESS(CurrentBatch.Add(spanItem), "failed to add span to empty batch");
178+
ScheduleBatchCompletion(now);
99179
}
100180
}
101181

182+
void ScheduleBatchCompletion(TMonotonic now) {
183+
TMonotonic completionTime = now + TDuration::MilliSeconds(MaxBatchAccumulationMilliseconds);
184+
TActivationContext::Schedule(completionTime,
185+
new IEventHandle(SelfId(), {}, new TEvents::TEvWakeup(SendBatchId)));
186+
187+
}
188+
189+
void CompleteCurrentBatch() {
190+
BatchQueue.push(std::move(CurrentBatch).Complete());
191+
CurrentBatch = TBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName);
192+
++SendBatchId;
193+
}
194+
102195
void TryToSend() {
103196
const TMonotonic now = TActivationContext::Monotonic();
104197

105198
ui32 numSpansDropped = 0;
106-
while (!Spans.empty()) {
107-
const TSpanQueueItem& item = Spans.front();
199+
while (!BatchQueue.empty()) {
200+
const TBatch::TData& item = BatchQueue.front();
108201
if (item.ExpirationTimestamp <= now) {
109-
SpansSize -= item.Size;
110-
Spans.pop_front();
111-
++numSpansDropped;
202+
SpansSize -= item.SizeBytes;
203+
numSpansDropped += item.SizeSpans;
204+
BatchQueue.pop();
112205
} else {
113206
break;
114207
}
@@ -119,42 +212,36 @@ namespace NWilson {
119212
"dropped " << numSpansDropped << " span(s) due to expiration");
120213
}
121214

122-
if (Context || Spans.empty()) {
215+
if (Context || BatchQueue.empty()) {
123216
return;
124217
} else if (now < NextSendTimestamp) {
125218
ScheduleWakeup(NextSendTimestamp);
126219
return;
127220
}
128221

129-
NServiceProto::ExportTraceServiceRequest request;
130-
auto *rspan = request.add_resource_spans();
131-
auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
132-
serviceNameAttr->set_key("service.name");
133-
serviceNameAttr->mutable_value()->set_string_value(ServiceName);
134-
auto *sspan = rspan->add_scope_spans();
135-
136-
NextSendTimestamp = now;
137-
for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
138-
auto& item = Spans.front();
139-
auto& s = item.Span;
140-
141-
LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span"
142-
<< " TraceId# " << HexEncode(s.trace_id())
143-
<< " SpanId# " << HexEncode(s.span_id())
144-
<< " ParentSpanId# " << HexEncode(s.parent_span_id())
145-
<< " Name# " << s.name());
146-
147-
SpansSize -= item.Size;
148-
s.Swap(sspan->add_spans());
149-
NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);
222+
223+
TBatch::TData batch = std::move(BatchQueue.front());
224+
BatchQueue.pop();
225+
226+
ALOG_DEBUG(WILSON_SERVICE_ID, "exporting batch of " << batch.SizeSpans << " spans, total spans size: " << batch.SizeBytes);
227+
Y_ABORT_UNLESS(batch.Request.resource_spansSize() == 1 && batch.Request.resource_spans(0).scope_spansSize() == 1);
228+
for (const auto& span : batch.Request.resource_spans(0).scope_spans(0).spans()) {
229+
ALOG_DEBUG(WILSON_SERVICE_ID, "exporting span"
230+
<< " TraceId# " << HexEncode(span.trace_id())
231+
<< " SpanId# " << HexEncode(span.span_id())
232+
<< " ParentSpanId# " << HexEncode(span.parent_span_id())
233+
<< " Name# " << span.name());
150234
}
151235

236+
NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond);
237+
SpansSize -= batch.SizeBytes;
238+
152239
ScheduleWakeup(NextSendTimestamp);
153240
Context = std::make_unique<grpc::ClientContext>();
154241
if (GrpcSigner) {
155242
GrpcSigner->SignClientContext(*Context);
156243
}
157-
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
244+
Reader = Stub->AsyncExport(Context.get(), std::move(batch.Request), &CQ);
158245
Reader->Finish(&Response, &Status, nullptr);
159246
}
160247

@@ -179,15 +266,20 @@ namespace NWilson {
179266
template<typename T>
180267
void ScheduleWakeup(T&& deadline) {
181268
if (!WakeupScheduled) {
182-
TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
183-
nullptr, 0));
269+
TActivationContext::Schedule(deadline,
270+
new IEventHandle(SelfId(), {}, new TEvents::TEvWakeup));
184271
WakeupScheduled = true;
185272
}
186273
}
187274

188-
void HandleWakeup() {
189-
Y_ABORT_UNLESS(WakeupScheduled);
190-
WakeupScheduled = false;
275+
void HandleWakeup(TEvents::TEvWakeup::TPtr& ev) {
276+
const auto tag = ev->Get()->Tag;
277+
if (tag == SendBatchId) {
278+
CompleteCurrentBatch();
279+
} else if (tag == 0) {
280+
Y_ABORT_UNLESS(WakeupScheduled);
281+
WakeupScheduled = false;
282+
}
191283
TryMakeProgress();
192284
}
193285

@@ -198,7 +290,7 @@ namespace NWilson {
198290

199291
STRICT_STFUNC(StateWork,
200292
hFunc(TEvWilson, Handle);
201-
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
293+
hFunc(TEvents::TEvWakeup, HandleWakeup);
202294
);
203295

204296
STRICT_STFUNC(StateBroken,

0 commit comments

Comments
 (0)