Skip to content

Commit f1d16fa

Browse files
committed
fixup
1 parent 2798fe7 commit f1d16fa

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,14 @@ struct TCombinerNodes {
133133
}
134134

135135
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** values, NUdf::TUnboxedValue* keys) const {
136+
for (size_t i = 0U; i < ItemNodes.size(); ++i) {
137+
keys[i] = std::move(*(values[i]));
138+
}
139+
}
140+
141+
void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue** values) const {
136142
for (size_t i = 0U, j = 0U; i < ItemNodes.size(); ++i) {
137-
keys[i] = *(values[j++]);
143+
keys[i] = std::move(*(values[j++]));
138144
}
139145
}
140146

@@ -353,6 +359,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
353359
Init = -1,
354360
Update,
355361
DataRequired,
362+
DataExtractionRequired,
356363
Skip
357364
};
358365
TSpillingSupportState(
@@ -629,8 +636,15 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
629636
}
630637
AsyncReadOperation = std::nullopt;
631638
}
639+
632640
auto& bucket = SpilledBuckets.front();
633641
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false;
642+
if (HasDataForProcessing) {
643+
Tongue = bucket.InMemoryProcessingState->Tongue;
644+
Throat = bucket.InMemoryProcessingState->Throat;
645+
HasDataForProcessing = false;
646+
return false;
647+
}
634648
//recover spilled state
635649
while(!bucket.SpilledState->Empty()) {
636650
RecoverState = true;
@@ -660,15 +674,14 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
660674
if (AsyncReadOperation) {
661675
return true;
662676
}
663-
auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
677+
/* auto **fields = Ctx.WideFields.data() + WideFieldsIndex;
664678
for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) {
665679
if (fields[i]) {
666680
fields[i] = &(BufferForUsedInputItems[j++]);
667681
}
668-
}
682+
}*/
669683

670-
Tongue = bucket.InMemoryProcessingState->Tongue;
671-
Throat = bucket.InMemoryProcessingState->Throat;
684+
Throat = BufferForKeyAndState.data();
672685

673686
HasDataForProcessing = true;
674687
return false;
@@ -1287,6 +1300,10 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12871300
case TSpillingSupportState::ETasteResult::DataRequired:
12881301
Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat));
12891302
break;
1303+
case TSpillingSupportState::ETasteResult::DataExtractionRequired:
1304+
Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields);
1305+
break;
1306+
12901307
case TSpillingSupportState::ETasteResult::Skip:
12911308
break;
12921309
}

0 commit comments

Comments
 (0)