Skip to content

Commit 98b05b7

Browse files
committed
streamlookup join: add lru size monitoring
1 parent f1166a9 commit 98b05b7

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class TInputTransformStreamLookupBase
6969
, MaxDelayedRows(maxDelayedRows)
7070
, CacheTtl(cacheTtl)
7171
, ReadyQueue(OutputRowType)
72+
, LastLruSize(0)
7273
{
7374
Y_ABORT_UNLESS(Alloc);
7475
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
@@ -172,11 +173,14 @@ class TInputTransformStreamLookupBase
172173
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
173174
}
174175
KeysForLookup->clear();
176+
auto deltaLruSize = (i64)LruCache->Size() - LastLruSize;
175177
auto deltaTime = GetCpuTimeDelta(startCycleCount);
176178
CpuTime += deltaTime;
177179
if (CpuTimeUs) {
180+
LruSize->Add(deltaLruSize); // Note: there can be several streamlookup tied to same counter, so Add instead of Set
178181
CpuTimeUs->Add(deltaTime.MicroSeconds());
179182
}
183+
LastLruSize += deltaLruSize;
180184
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
181185
}
182186

@@ -200,6 +204,10 @@ class TInputTransformStreamLookupBase
200204
}
201205

202206
void Free() {
207+
if (LruSize && LastLruSize) {
208+
LruSize->Add(-LastLruSize);
209+
LastLruSize = 0;
210+
}
203211
auto guard = BindAllocator();
204212
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
205213
KeysForLookup.reset();
@@ -290,6 +298,7 @@ class TInputTransformStreamLookupBase
290298
auto component = taskCounters->GetSubgroup("component", "Lookup");
291299
LruHits = component->GetCounter("Hits");
292300
LruMiss = component->GetCounter("Miss");
301+
LruSize = component->GetCounter("LruSize");
293302
CpuTimeUs = component->GetCounter("CpuUs");
294303
Batches = component->GetCounter("Batches");
295304
}
@@ -358,9 +367,11 @@ class TInputTransformStreamLookupBase
358367
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
359368
NYql::NDq::TDqAsyncStats IngressStats;
360369
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
370+
i64 LastLruSize;
361371

362372
::NMonitoring::TDynamicCounters::TCounterPtr LruHits;
363373
::NMonitoring::TDynamicCounters::TCounterPtr LruMiss;
374+
::NMonitoring::TDynamicCounters::TCounterPtr LruSize;
364375
::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs;
365376
::NMonitoring::TDynamicCounters::TCounterPtr Batches;
366377
TDuration CpuTime;

0 commit comments

Comments
 (0)