Skip to content

Commit a9adb69

Browse files
Merge f1d16fa into ee02567
2 parents ee02567 + f1d16fa commit a9adb69

File tree

1 file changed

+36
-6
lines changed

1 file changed

+36
-6
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ struct TCombinerNodes {
132132
}
133133
}
134134

135+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const {
136+
for (size_t i = 0U; i < ItemNodes.size(); ++i) {
137+
keys[i] = std::move(*(values[i]));
138+
}
139+
}
140+
141+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue** values) const {
142+
for (size_t i = 0U, j = 0U; i < ItemNodes.size(); ++i) {
143+
keys[i] = std::move(*(values[j++]));
144+
}
145+
}
146+
135147
void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
136148
if (keys) {
137149
std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod());
@@ -346,6 +358,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
346358
enum class ETasteResult: i8 {
347359
Init = -1,
348360
Update,
361+
DataRequired,
362+
DataExtractionRequired,
349363
Skip
350364
};
351365
TSpillingSupportState(
@@ -445,9 +459,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
445459

446460
// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
447461
BufferForKeyAndState.resize(0);
448-
TryToSpillRawData(bucket, bucketId);
462+
// Prepare space for raw data
463+
BufferForUsedInputItems.resize(ItemNodesSize);
464+
Throat = BufferForUsedInputItems.data();
449465

450-
return ETasteResult::Skip;
466+
return ETasteResult::DataRequired;
451467
}
452468

453469
NUdf::TUnboxedValuePod* Extract() {
@@ -620,8 +636,15 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
620636
}
621637
AsyncReadOperation = std::nullopt;
622638
}
639+
623640
auto& bucket = SpilledBuckets.front();
624641
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
642+
if (HasDataForProcessing) {
643+
Tongue = bucket.InMemoryProcessingState->Tongue;
644+
Throat = bucket.InMemoryProcessingState->Throat;
645+
HasDataForProcessing = false;
646+
return false;
647+
}
625648
//recover spilled state
626649
while(!bucket.SpilledState->Empty()) {
627650
RecoverState = true;
@@ -651,15 +674,14 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
651674
if (AsyncReadOperation) {
652675
return true;
653676
}
654-
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
677+
/* auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
655678
for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) {
656679
if (fields[i]) {
657680
fields[i] = &(BufferForUsedInputItems[j++]);
658681
}
659-
}
682+
}*/
660683

661-
Tongue = bucket.InMemoryProcessingState->Tongue;
662-
Throat = bucket.InMemoryProcessingState->Throat;
684+
Throat = BufferForKeyAndState.data();
663685

664686
HasDataForProcessing = true;
665687
return false;
@@ -1237,6 +1259,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12371259
, AllowSpilling(allowSpilling)
12381260
{}
12391261

1262+
// MARK: DoCAlculate
12401263
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12411264
if (!state.HasValue()) {
12421265
MakeState(ctx, state);
@@ -1274,6 +1297,13 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12741297
case TSpillingSupportState::ETasteResult::Update:
12751298
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12761299
break;
1300+
case TSpillingSupportState::ETasteResult::DataRequired:
1301+
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1302+
break;
1303+
case TSpillingSupportState::ETasteResult::DataExtractionRequired:
1304+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1305+
break;
1306+
12771307
case TSpillingSupportState::ETasteResult::Skip:
12781308
break;
12791309
}

0 commit comments

Comments
 (0)