Skip to content

Commit 22ef9c2

Browse files
Merge 5d2dd8b into 2d3caea
2 parents 2d3caea + 5d2dd8b commit 22ef9c2

File tree

1 file changed

+57
-37
lines changed

1 file changed

+57
-37
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,24 @@ struct TCombinerNodes {
132132
}
133133
}
134134

135+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** from, NUdf::TUnboxedValue* to) const {
136+
for (ui32 i = 0U; i < ItemNodes.size(); ++i) {
137+
if (from[i]) {
138+
to[i] = std::move(*(from[i]));
139+
}
140+
}
141+
}
142+
143+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* from, NUdf::TUnboxedValue** to) const {
144+
for (size_t i = 0, j = 0; i != ItemNodes.size(); ++i) {
145+
if (IsInputItemNodeUsed(i)) {
146+
*to[i] = std::move(from[j++]);
147+
} else {
148+
to[i] = nullptr;
149+
}
150+
}
151+
}
152+
135153
void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
136154
if (keys) {
137155
std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod());
@@ -346,16 +364,17 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
346364
enum class ETasteResult: i8 {
347365
Init = -1,
348366
Update,
367+
DataRequired,
368+
RawDataExtractionRequired,
349369
Skip
350370
};
351371
TSpillingSupportState(
352-
TMemoryUsageInfo* memInfo, size_t wideFieldsIndex,
372+
TMemoryUsageInfo* memInfo,
353373
const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize,
354374
const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx
355375
)
356376
: TBase(memInfo)
357377
, InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal)
358-
, WideFieldsIndex(wideFieldsIndex)
359378
, UsedInputItemType(usedInputItemType)
360379
, KeyAndStateType(keyAndStateType)
361380
, KeyWidth(keyWidth)
@@ -380,7 +399,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
380399
bool IsProcessingRequired() const {
381400
if (InputStatus != EFetchResult::Finish) return true;
382401

383-
return HasDataForProcessing;
402+
return HasRawDataToExtract || HasDataForProcessing;
384403
}
385404

386405
bool UpdateAndWait() {
@@ -424,10 +443,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
424443
return isNew ? ETasteResult::Init : ETasteResult::Update;
425444
}
426445
if (GetMode() == EOperatingMode::ProcessSpilled) {
446+
if (HasRawDataToExtract) {
447+
// Tongue not used here.
448+
Throat = BufferForUsedInputItems.data();
449+
HasRawDataToExtract = false;
450+
HasDataForProcessing = true;
451+
return ETasteResult::RawDataExtractionRequired;
452+
}
453+
HasDataForProcessing = false;
427454
// while restoration we process buckets one by one starting from the first in a queue
428455
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
429456
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
430457
Tongue = SpilledBuckets.front().InMemoryProcessingState->Tongue;
458+
BufferForUsedInputItems.resize(0);
431459
return isNew ? ETasteResult::Init : ETasteResult::Update;
432460
}
433461

@@ -445,9 +473,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
445473

446474
// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
447475
BufferForKeyAndState.resize(0);
448-
TryToSpillRawData(bucket, bucketId);
476+
// Prepare space for raw data
477+
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
478+
BufferForUsedInputItems.resize(ItemNodesSize);
479+
BufferForUsedInputItemsBucketId = bucketId;
480+
Throat = BufferForUsedInputItems.data();
449481

450-
return ETasteResult::Skip;
482+
return ETasteResult::DataRequired;
451483
}
452484

453485
NUdf::TUnboxedValuePod* Extract() {
@@ -472,25 +504,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
472504
BufferForKeyAndState.resize(0);
473505
}
474506

475-
// Copies data from WideFields to local and tries to spill it using suitable bucket.
476-
// if the bucket is already busy, then the buffer will wait for the next iteration.
477-
void TryToSpillRawData(TSpilledBucket& bucket, size_t bucketId) {
478-
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
479-
MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
480-
481-
for (size_t i = 0; i < ItemNodesSize; ++i) {
482-
if (fields[i]) {
483-
BufferForUsedInputItems.push_back(*fields[i]);
484-
}
485-
}
486-
if (bucket.AsyncWriteOperation.has_value()) {
487-
BufferForUsedInputItemsBucketId = bucketId;
488-
return;
489-
}
490-
bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
491-
BufferForUsedInputItems.resize(0);
492-
}
493-
494507
bool FlushSpillingBuffersAndWait() {
495508
UpdateSpillingBuckets();
496509

@@ -620,8 +633,14 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
620633
}
621634
AsyncReadOperation = std::nullopt;
622635
}
636+
623637
auto& bucket = SpilledBuckets.front();
624638
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
639+
if (HasDataForProcessing) {
640+
Tongue = bucket.InMemoryProcessingState->Tongue;
641+
Throat = bucket.InMemoryProcessingState->Throat;
642+
return false;
643+
}
625644
//recover spilled state
626645
while(!bucket.SpilledState->Empty()) {
627646
RecoverState = true;
@@ -651,17 +670,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
651670
if (AsyncReadOperation) {
652671
return true;
653672
}
654-
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
655-
for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) {
656-
if (fields[i]) {
657-
fields[i] = &(BufferForUsedInputItems[j++]);
658-
}
659-
}
660673

661674
Tongue = bucket.InMemoryProcessingState->Tongue;
662675
Throat = bucket.InMemoryProcessingState->Throat;
663676

664-
HasDataForProcessing = true;
677+
HasRawDataToExtract = true;
665678
return false;
666679
}
667680
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
@@ -725,8 +738,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
725738

726739
bool HasDataForProcessing = false;
727740

741+
bool HasRawDataToExtract = false;
742+
728743
TState InMemoryProcessingState;
729-
const size_t WideFieldsIndex;
730744
const TMultiType* const UsedInputItemType;
731745
const TMultiType* const KeyAndStateType;
732746
const size_t KeyWidth;
@@ -1237,6 +1251,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12371251
, AllowSpilling(allowSpilling)
12381252
{}
12391253

1254+
// MARK: DoCAlculate
12401255
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12411256
if (!state.HasValue()) {
12421257
MakeState(ctx, state);
@@ -1246,14 +1261,12 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12461261
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
12471262

12481263
while (true) {
1249-
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
1250-
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
1251-
12521264
if (ptr->UpdateAndWait()) {
12531265
return EFetchResult::Yield;
12541266
}
1255-
12561267
if (ptr->InputStatus != EFetchResult::Finish) {
1268+
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
1269+
fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i);
12571270
switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) {
12581271
case EFetchResult::One:
12591272
break;
@@ -1274,6 +1287,13 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12741287
case TSpillingSupportState::ETasteResult::Update:
12751288
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12761289
break;
1290+
case TSpillingSupportState::ETasteResult::DataRequired:
1291+
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1292+
break;
1293+
case TSpillingSupportState::ETasteResult::RawDataExtractionRequired:
1294+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1295+
break;
1296+
12771297
case TSpillingSupportState::ETasteResult::Skip:
12781298
break;
12791299
}
@@ -1553,7 +1573,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
15531573
#endif
15541574
private:
15551575
void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
1556-
state = ctx.HolderFactory.Create<TSpillingSupportState>(WideFieldsIndex,
1576+
state = ctx.HolderFactory.Create<TSpillingSupportState>(
15571577
UsedInputItemType, KeyAndStateType,
15581578
Nodes.KeyNodes.size(),
15591579
Nodes.ItemNodes.size(),

0 commit comments

Comments
 (0)