Skip to content

Commit 57e5ffd

Browse files
Merge a0dfb94 into 4981a3b
2 parents 4981a3b + a0dfb94 commit 57e5ffd

File tree

1 file changed

+46
-49
lines changed

1 file changed

+46
-49
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,13 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
364364
enum class ETasteResult: i8 {
365365
Init = -1,
366366
Update,
367-
ConsumeRawData,
368-
ExtractRawData
367+
ConsumeRawData
368+
};
369+
370+
enum class EUpdateResult: i8 {
371+
Yield = -1,
372+
ExtractRawData,
373+
None
369374
};
370375
TSpillingSupportState(
371376
TMemoryUsageInfo* memInfo,
@@ -398,28 +403,28 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
398403
bool IsProcessingRequired() const {
399404
if (InputStatus != EFetchResult::Finish) return true;
400405

401-
return HasRawDataToExtract || HasDataForProcessing;
406+
return !SpilledBuckets.empty() && SpilledBuckets.front().BucketState != TSpilledBucket::EBucketState::InMemory;
402407
}
403408

404-
bool UpdateAndWait() {
409+
EUpdateResult Update() {
405410
switch (GetMode()) {
406411
case EOperatingMode::InMemory: {
407412
if (CheckMemoryAndSwitchToSpilling()) {
408-
return UpdateAndWait();
413+
return Update();
409414
}
410-
return false;
415+
return EUpdateResult::None;
411416
}
412417

413418
case EOperatingMode::ProcessSpilled:
414419
return ProcessSpilledDataAndWait();
415420
case EOperatingMode::Spilling: {
416421
UpdateSpillingBuckets();
417422

418-
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return true;
423+
if (!HasMemoryForProcessing() && InputStatus != EFetchResult::Finish && TryToReduceMemoryAndWait()) return EUpdateResult::Yield;
419424

420425
if (BufferForUsedInputItems.size()) {
421426
auto& bucket = SpilledBuckets[BufferForUsedInputItemsBucketId];
422-
if (bucket.AsyncWriteOperation.has_value()) return true;
427+
if (bucket.AsyncWriteOperation.has_value()) return EUpdateResult::Yield;
423428

424429
bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems);
425430
BufferForUsedInputItems.resize(0); //for freeing allocated key value asap
@@ -429,7 +434,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
429434

430435
// Prepare buffer for reading new key
431436
BufferForKeyAndState.resize(KeyWidth);
432-
return false;
437+
return EUpdateResult::None;
433438
}
434439
}
435440
}
@@ -442,14 +447,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
442447
return isNew ? ETasteResult::Init : ETasteResult::Update;
443448
}
444449
if (GetMode() == EOperatingMode::ProcessSpilled) {
445-
if (HasRawDataToExtract) {
446-
// Tongue not used here.
447-
Throat = BufferForUsedInputItems.data();
448-
HasRawDataToExtract = false;
449-
HasDataForProcessing = true;
450-
return ETasteResult::ExtractRawData;
451-
}
452-
HasDataForProcessing = false;
453450
// while restoration we process buckets one by one starting from the first in a queue
454451
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
455452
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
@@ -476,8 +473,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
476473
MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error");
477474
BufferForUsedInputItems.resize(ItemNodesSize);
478475
BufferForUsedInputItemsBucketId = bucketId;
476+
479477
Throat = BufferForUsedInputItems.data();
480-
478+
481479
return ETasteResult::ConsumeRawData;
482480
}
483481

@@ -503,7 +501,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
503501
BufferForKeyAndState.resize(0);
504502
}
505503

506-
bool FlushSpillingBuffersAndWait() {
504+
EUpdateResult FlushSpillingBuffersAndWait() {
507505
UpdateSpillingBuckets();
508506

509507
ui64 finishedCount = 0;
@@ -519,7 +517,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
519517
}
520518
}
521519

522-
if (finishedCount != SpilledBuckets.size()) return true;
520+
if (finishedCount != SpilledBuckets.size()) return EUpdateResult::Yield;
523521

524522
SwitchMode(EOperatingMode::ProcessSpilled);
525523

@@ -628,11 +626,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
628626
return false;
629627
}
630628

631-
bool ProcessSpilledDataAndWait() {
632-
if (SpilledBuckets.empty()) return false;
629+
EUpdateResult ProcessSpilledDataAndWait() {
630+
if (SpilledBuckets.empty()) return EUpdateResult::None;
633631

634632
if (AsyncReadOperation) {
635-
if (!AsyncReadOperation->HasValue()) return true;
633+
if (!AsyncReadOperation->HasValue()) return EUpdateResult::Yield;
636634
if (RecoverState) {
637635
SpilledBuckets[0].SpilledState->AsyncReadCompleted(AsyncReadOperation->ExtractValue().value(), Ctx.HolderFactory);
638636
} else {
@@ -642,20 +640,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
642640
}
643641

644642
auto& bucket = SpilledBuckets.front();
645-
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
646-
if (HasDataForProcessing) {
647-
Tongue = bucket.InMemoryProcessingState->Tongue;
648-
Throat = bucket.InMemoryProcessingState->Throat;
649-
return false;
650-
}
643+
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return EUpdateResult::None;
644+
651645
//recover spilled state
652646
while(!bucket.SpilledState->Empty()) {
653647
RecoverState = true;
654648
BufferForKeyAndState.resize(KeyAndStateType->GetElementsCount());
655649
AsyncReadOperation = bucket.SpilledState->ExtractWideItem(BufferForKeyAndState);
656650
if (AsyncReadOperation) {
657651
BufferForKeyAndState.resize(0);
658-
return true;
652+
return EUpdateResult::Yield;
659653
}
660654
for (size_t i = 0; i< KeyWidth; ++i) {
661655
//jumping into unsafe world, refusing ownership
@@ -675,18 +669,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
675669
BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
676670
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
677671
if (AsyncReadOperation) {
678-
return true;
672+
return EUpdateResult::Yield;
679673
}
680674

675+
Throat = BufferForUsedInputItems.data();
681676
Tongue = bucket.InMemoryProcessingState->Tongue;
682-
Throat = bucket.InMemoryProcessingState->Throat;
683677

684-
HasRawDataToExtract = true;
685-
return false;
678+
return EUpdateResult::ExtractRawData;
686679
}
687680
bucket.BucketState = TSpilledBucket::EBucketState::InMemory;
688-
HasDataForProcessing = false;
689-
return false;
681+
return EUpdateResult::None;
690682
}
691683

692684
EOperatingMode GetMode() const {
@@ -744,10 +736,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
744736
private:
745737
ui64 NextBucketToSpill = 0;
746738

747-
bool HasDataForProcessing = false;
748-
749-
bool HasRawDataToExtract = false;
750-
751739
TState InMemoryProcessingState;
752740
const TMultiType* const UsedInputItemType;
753741
const TMultiType* const KeyAndStateType;
@@ -1259,6 +1247,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12591247
, AllowSpilling(allowSpilling)
12601248
{}
12611249

1250+
// MARK: DoCalculate
12621251
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12631252
if (!state.HasValue()) {
12641253
MakeState(ctx, state);
@@ -1268,8 +1257,14 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12681257
auto **fields = ctx.WideFields.data() + WideFieldsIndex;
12691258

12701259
while (true) {
1271-
if (ptr->UpdateAndWait()) {
1272-
return EFetchResult::Yield;
1260+
switch(ptr->Update()) {
1261+
case TSpillingSupportState::EUpdateResult::Yield:
1262+
return EFetchResult::Yield;
1263+
case TSpillingSupportState::EUpdateResult::ExtractRawData:
1264+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1265+
break;
1266+
case TSpillingSupportState::EUpdateResult::None:
1267+
break;
12731268
}
12741269
if (ptr->InputStatus != EFetchResult::Finish) {
12751270
for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i)
@@ -1297,9 +1292,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12971292
case TSpillingSupportState::ETasteResult::ConsumeRawData:
12981293
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12991294
break;
1300-
case TSpillingSupportState::ETasteResult::ExtractRawData:
1301-
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1302-
break;
13031295
}
13041296
continue;
13051297
}
@@ -1366,13 +1358,18 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
13661358

13671359
block = more;
13681360

1369-
const auto waitMoreFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::UpdateAndWait));
1370-
const auto waitMoreFuncPtr = CastInst::Create(Instruction::IntToPtr, waitMoreFunc, PointerType::getUnqual(boolFuncType), "wait_more_func", block);
1371-
const auto waitMore = CallInst::Create(boolFuncType, waitMoreFuncPtr, { stateArg }, "wait_more", block);
1361+
const auto updateFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TSpillingSupportState::Update));
1362+
const auto updateType = FunctionType::get(wayType, {stateArg->getType()}, false);
1363+
const auto updateFuncPtr = CastInst::Create(Instruction::IntToPtr, updateFunc, PointerType::getUnqual(updateType), "update_func", block);
1364+
const auto update = CallInst::Create(updateType, updateFuncPtr, { stateArg }, "update", block);
13721365

13731366
result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Yield)), block);
13741367

1375-
BranchInst::Create(over, test, waitMore, block);
1368+
const auto updateWay = SwitchInst::Create(update, test, 3U, block);
1369+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::Yield)), over);
1370+
// TODO add exctraction code and jmp there
1371+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::ExtractRawData)), test);
1372+
updateWay->addCase(ConstantInt::get(wayType, static_cast<i8>(TSpillingSupportState::EUpdateResult::None)), test);
13761373

13771374
block = test;
13781375

0 commit comments

Comments
 (0)