Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TInputTransformStreamLookupBase
, MaxDelayedRows(maxDelayedRows)
, CacheTtl(cacheTtl)
, ReadyQueue(OutputRowType)
, LastLruSize(0)
{
Y_ABORT_UNLESS(Alloc);
for (size_t i = 0; i != LookupInputIndexes.size(); ++i) {
Expand Down Expand Up @@ -172,11 +173,14 @@ class TInputTransformStreamLookupBase
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
}
KeysForLookup->clear();
auto deltaLruSize = (i64)LruCache->Size() - LastLruSize;
auto deltaTime = GetCpuTimeDelta(startCycleCount);
CpuTime += deltaTime;
if (CpuTimeUs) {
LruSize->Add(deltaLruSize); // Note: there can be several streamlookup tied to same counter, so Add instead of Set
CpuTimeUs->Add(deltaTime.MicroSeconds());
}
LastLruSize += deltaLruSize;
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
}

Expand All @@ -200,6 +204,10 @@ class TInputTransformStreamLookupBase
}

void Free() {
if (LruSize && LastLruSize) {
LruSize->Add(-LastLruSize);
LastLruSize = 0;
}
auto guard = BindAllocator();
//All resources, held by this class, that have been created with mkql allocator, must be deallocated here
KeysForLookup.reset();
Expand Down Expand Up @@ -290,6 +298,7 @@ class TInputTransformStreamLookupBase
auto component = taskCounters->GetSubgroup("component", "Lookup");
LruHits = component->GetCounter("Hits");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

в моем понимании тут два сенсора должны быть deriv и один non deriv

LruMiss = component->GetCounter("Miss");
LruSize = component->GetCounter("Size");
CpuTimeUs = component->GetCounter("CpuUs");
Batches = component->GetCounter("Batches");
}
Expand Down Expand Up @@ -358,9 +367,11 @@ class TInputTransformStreamLookupBase
NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue;
NYql::NDq::TDqAsyncStats IngressStats;
std::shared_ptr<IDqAsyncLookupSource::TUnboxedValueMap> KeysForLookup;
i64 LastLruSize;

::NMonitoring::TDynamicCounters::TCounterPtr LruHits;
::NMonitoring::TDynamicCounters::TCounterPtr LruMiss;
::NMonitoring::TDynamicCounters::TCounterPtr LruSize;
::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs;
::NMonitoring::TDynamicCounters::TCounterPtr Batches;
TDuration CpuTime;
Expand Down
Loading