Skip to content

Commit e66697f

Browse files
Merge 390ef80 into 582cd7b
2 parents 582cd7b + 390ef80 commit e66697f

File tree

1 file changed

+39
-45
lines changed

1 file changed

+39
-45
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 39 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
364364
enum class ETasteResult: i8 {
365365
Init = -1,
366366
Update,
367-
ConsumeRawData,
368-
ExtractRawData
367+
ConsumeRawData
368+
};
369+
370+
enum class EUpdateResult: i8 {
371+
Yield = -1,
372+
ExtractRawData,
373+
None
369374
};
370375
TSpillingSupportState(
371376
TMemoryUsageInfo* memInfo,
@@ -398,28 +403,28 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
398403
bool IsProcessingRequired() const {
399404
if (InputStatus != EFetchResult::Finish) return true;
400405

401-
return HasRawDataToExtract || HasDataForProcessing;
406+
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
402407
}
403408

404-
bool UpdateAndWait() {
409+
EUpdateResult Update() {
405410
switch (GetMode()) {
406411
case EOperatingMode::InMemory: {
407412
if (CheckMemoryAndSwitchToSpilling()) {
408-
return UpdateAndWait();
413+
return Update();
409414
}
410-
return false;
415+
return EUpdateResult::None;
411416
}
412417

413418
case EOperatingMode::ProcessSpilled:
414419
return ProcessSpilledDataAndWait();
415420
case EOperatingMode::Spilling: {
416421
UpdateSpillingBuckets();
417422

418-
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
423+
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;
419424

420425
if (BufferForUsedInputItems.size()) {
421426
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
422-
if (bucket.AsyncWriteOperation.has_value()) return true;
427+
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;
423428

424429
bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
425430
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
@@ -429,7 +434,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
429434

430435
// Prepare buffer for reading new key
431436
BufferForKeyAndState.resize(KeyWidth);
432-
return false;
437+
return EUpdateResult::None;
433438
}
434439
}
435440
}
@@ -442,14 +447,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
442447
return isNew ? ETasteResult::Init : ETasteResult::Update;
443448
}
444449
if (GetMode() == EOperatingMode::ProcessSpilled) {
445-
if (HasRawDataToExtract) {
446-
// Tongue not used here.
447-
Throat = BufferForUsedInputItems.data();
448-
HasRawDataToExtract = false;
449-
HasDataForProcessing = true;
450-
return ETasteResult::ExtractRawData;
451-
}
452-
HasDataForProcessing = false;
453450
// while restoration we process buckets one by one starting from the first in a queue
454451
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
455452
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
@@ -476,8 +473,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
476473
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
477474
BufferForUsedInputItems.resize(ItemNodesSize);
478475
BufferForUsedInputItemsBucketId = bucketId;
476+
479477
Throat = BufferForUsedInputItems.data();
480-
478+
Tongue = nullptr;
479+
480+
481481
return ETasteResult::ConsumeRawData;
482482
}
483483

@@ -503,7 +503,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
503503
BufferForKeyAndState.resize(0);
504504
}
505505

506-
bool FlushSpillingBuffersAndWait() {
506+
EUpdateResult FlushSpillingBuffersAndWait() {
507507
UpdateSpillingBuckets();
508508

509509
ui64 finishedCount = 0;
@@ -519,7 +519,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
519519
}
520520
}
521521

522-
if (finishedCount != SpilledBuckets.size()) return true;
522+
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;
523523

524524
SwitchMode(EOperatingMode::ProcessSpilled);
525525

@@ -628,11 +628,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
628628
return false;
629629
}
630630

631-
bool ProcessSpilledDataAndWait() {
632-
if (SpilledBuckets.empty()) return false;
631+
EUpdateResult ProcessSpilledDataAndWait() {
632+
if (SpilledBuckets.empty()) return EUpdateResult::None;
633633

634634
if (AsyncReadOperation) {
635-
if (!AsyncReadOperation->HasValue()) return true;
635+
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
636636
if (RecoverState) {
637637
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
638638
} else {
@@ -642,20 +642,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
642642
}
643643

644644
auto& bucket = SpilledBuckets.front();
645-
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
646-
if (HasDataForProcessing) {
647-
Tongue = bucket.InMemoryProcessingState->Tongue;
648-
Throat = bucket.InMemoryProcessingState->Throat;
649-
return false;
650-
}
645+
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None;
646+
651647
//recover spilled state
652648
while(!bucket.SpilledState->Empty()) {
653649
RecoverState = true;
654650
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
655651
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
656652
if (AsyncReadOperation) {
657653
BufferForKeyAndState.resize(0);
658-
return true;
654+
return EUpdateResult::Yield;
659655
}
660656
for (size_t i = 0; i< KeyWidth; ++i) {
661657
//jumping into unsafe world, refusing ownership
@@ -675,18 +671,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
675671
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
676672
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
677673
if (AsyncReadOperation) {
678-
return true;
674+
return EUpdateResult::Yield;
679675
}
680676

677+
Throat = BufferForUsedInputItems.data();
681678
Tongue = bucket.InMemoryProcessingState->Tongue;
682-
Throat = bucket.InMemoryProcessingState->Throat;
683679

684-
HasRawDataToExtract = true;
685-
return false;
680+
return EUpdateResult::ExtractRawData;
686681
}
687682
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
688-
HasDataForProcessing = false;
689-
return false;
683+
return EUpdateResult::None;
690684
}
691685

692686
EOperatingMode GetMode() const {
@@ -744,10 +738,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
744738
private:
745739
ui64 NextBucketToSpill = 0;
746740

747-
bool HasDataForProcessing = false;
748-
749-
bool HasRawDataToExtract = false;
750-
751741
TState InMemoryProcessingState;
752742
const TMultiType* const UsedInputItemType;
753743
const TMultiType* const KeyAndStateType;
@@ -1259,6 +1249,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12591249
, AllowSpilling(allowSpilling)
12601250
{}
12611251

1252+
// MARK: DoCalculate
12621253
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12631254
if (!state.HasValue()) {
12641255
MakeState(ctx, state);
@@ -1268,8 +1259,14 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12681259
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
12691260

12701261
while (true) {
1271-
if (ptr->UpdateAndWait()) {
1272-
return EFetchResult::Yield;
1262+
switch(ptr->Update()) {
1263+
case TSpillingSupportState::EUpdateResult::Yield:
1264+
return EFetchResult::Yield;
1265+
case TSpillingSupportState::EUpdateResult::ExtractRawData:
1266+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1267+
break;
1268+
case TSpillingSupportState::EUpdateResult::None:
1269+
break;
12731270
}
12741271
if (ptr->InputStatus != EFetchResult::Finish) {
12751272
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
@@ -1297,9 +1294,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12971294
case TSpillingSupportState::ETasteResult::ConsumeRawData:
12981295
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12991296
break;
1300-
case TSpillingSupportState::ETasteResult::ExtractRawData:
1301-
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1302-
break;
13031297
}
13041298
continue;
13051299
}

0 commit comments

Comments
 (0)