Skip to content

Commit 29b51d3

Browse files
Merge 2798fe7 into 21bdc0e
2 parents 21bdc0e + 2798fe7 commit 29b51d3

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ struct TCombinerNodes {
132132
}
133133
}
134134

135+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const {
136+
for (size_t i = 0U, j = 0U; i < ItemNodes.size(); ++i) {
137+
keys[i] = *(values[j++]);
138+
}
139+
}
140+
135141
void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const {
136142
if (keys) {
137143
std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod());
@@ -346,6 +352,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
346352
enum class ETasteResult: i8 {
347353
Init = -1,
348354
Update,
355+
DataRequired,
349356
Skip
350357
};
351358
TSpillingSupportState(
@@ -445,9 +452,11 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
445452

446453
// Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled
447454
BufferForKeyAndState.resize(0);
448-
TryToSpillRawData(bucket, bucketId);
455+
// Prepare space for raw data
456+
BufferForUsedInputItems.resize(ItemNodesSize);
457+
Throat = BufferForUsedInputItems.data();
449458

450-
return ETasteResult::Skip;
459+
return ETasteResult::DataRequired;
451460
}
452461

453462
NUdf::TUnboxedValuePod* Extract() {
@@ -1237,6 +1246,7 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12371246
, AllowSpilling(allowSpilling)
12381247
{}
12391248

1249+
// MARK: DoCAlculate
12401250
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12411251
if (!state.HasValue()) {
12421252
MakeState(ctx, state);
@@ -1274,6 +1284,9 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12741284
case TSpillingSupportState::ETasteResult::Update:
12751285
Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12761286
break;
1287+
case TSpillingSupportState::ETasteResult::DataRequired:
1288+
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
1289+
break;
12771290
case TSpillingSupportState::ETasteResult::Skip:
12781291
break;
12791292
}

0 commit comments

Comments
 (0)