Skip to content

Commit e10ef55

Browse files
authored
Fix MakeBlocks usage in BlockMapJoinCore computation node (#8686)
1 parent 8c72783 commit e10ef55

File tree

3 files changed

+89
-18
lines changed

3 files changed

+89
-18
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ class TBlockJoinState : public TBlockState {
126126
return true;
127127
}
128128

129+
bool HasBlocks() {
130+
return Count > 0;
131+
}
132+
129133
bool IsNotFull() const {
130134
return OutputRows_ < MaxLength_
131135
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
@@ -190,7 +194,7 @@ using TState = TBlockJoinState<RightRequired>;
190194
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
191195
const auto dict = Dict_->GetValue(ctx);
192196

193-
do {
197+
while (!blockState.HasBlocks()) {
194198
while (blockState.IsNotFull() && blockState.NextRow()) {
195199
const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
196200
if constexpr (WithoutRight) {
@@ -205,7 +209,7 @@ using TState = TBlockJoinState<RightRequired>;
205209
blockState.MakeRow(dict.Lookup(key));
206210
}
207211
}
208-
if (!blockState.IsFinished()) {
212+
if (blockState.IsNotFull() && !blockState.IsFinished()) {
209213
switch (Flow_->FetchValues(ctx, fields)) {
210214
case EFetchResult::Yield:
211215
return EFetchResult::Yield;
@@ -216,16 +220,15 @@ using TState = TBlockJoinState<RightRequired>;
216220
blockState.Finish();
217221
break;
218222
}
223+
// Leave the loop, if no values left in the flow.
224+
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
219225
}
220-
// Leave the outer loop, if no values left in the flow.
221-
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
222-
break;
223-
} while (true);
224-
225-
if (blockState.IsEmpty()) {
226-
return EFetchResult::Finish;
226+
if (blockState.IsEmpty()) {
227+
return EFetchResult::Finish;
228+
}
229+
blockState.MakeBlocks(ctx.HolderFactory);
227230
}
228-
blockState.MakeBlocks(ctx.HolderFactory);
231+
229232
const auto sliceSize = blockState.Slice();
230233

231234
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
@@ -294,7 +297,7 @@ using TState = TBlockJoinState<RightRequired>;
294297
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
295298
const auto dict = Dict_->GetValue(ctx);
296299

297-
do {
300+
while (!blockState.HasBlocks()) {
298301
if (iterState) {
299302
NUdf::TUnboxedValue lookupItem;
300303
// Process the remaining items from the iterator.
@@ -329,15 +332,13 @@ using TState = TBlockJoinState<RightRequired>;
329332
}
330333
// Leave the loop, if no values left in the flow.
331334
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
332-
break;
333335
}
334-
break;
335-
} while(true);
336-
337-
if (blockState.IsEmpty()) {
338-
return EFetchResult::Finish;
336+
if (blockState.IsEmpty()) {
337+
return EFetchResult::Finish;
338+
}
339+
blockState.MakeBlocks(ctx.HolderFactory);
339340
}
340-
blockState.MakeBlocks(ctx.HolderFactory);
341+
341342
const auto sliceSize = blockState.Slice();
342343

343344
for (size_t i = 0; i < ResultJoinItems_.size(); i++) {

ydb/library/yql/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,5 +470,74 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) {
470470

471471
} // Y_UNIT_TEST_SUITE
472472

473+
Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) {
474+
475+
constexpr size_t testSize = 1 << 14;
476+
constexpr size_t valueSize = 3;
477+
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
478+
static const TString hugeString(128, '1');
479+
480+
const TVector<TKSV> MakeFillTKSV(const TVector<ui64>& keyInit,
481+
const ui64 subkeyMultiplier, const TVector<TString>& valuePayload
482+
) {
483+
TVector<TKSV> testKSV;
484+
for (size_t i = 0; i < keyInit.size(); i++) {
485+
testKSV.push_back(std::make_tuple(keyInit[i],
486+
keyInit[i] * subkeyMultiplier,
487+
valuePayload[i]));
488+
}
489+
return testKSV;
490+
}
491+
492+
Y_UNIT_TEST(TestInnerOn1) {
493+
TVector<ui64> keyInit(testSize);
494+
std::fill(keyInit.begin(), keyInit.end(), 1);
495+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
496+
TKSWMap rightMap = {{1, hugeString}};
497+
TestBlockJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMap);
498+
}
499+
500+
Y_UNIT_TEST(TestInnerMultiOn1) {
501+
TVector<ui64> keyInit(testSize);
502+
std::fill(keyInit.begin(), keyInit.end(), 1);
503+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
504+
TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
505+
TestBlockMultiJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMultiMap);
506+
}
507+
508+
Y_UNIT_TEST(TestLeftOn1) {
509+
TVector<ui64> keyInit(testSize);
510+
std::fill(keyInit.begin(), keyInit.end(), 1);
511+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
512+
TKSWMap rightMap = {{1, hugeString}};
513+
TestBlockJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMap);;
514+
}
515+
516+
Y_UNIT_TEST(TestLeftMultiOn1) {
517+
TVector<ui64> keyInit(testSize);
518+
std::fill(keyInit.begin(), keyInit.end(), 1);
519+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
520+
TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
521+
TestBlockMultiJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMultiMap);
522+
}
523+
524+
Y_UNIT_TEST(TestLeftSemiOn1) {
525+
TVector<ui64> keyInit(testSize);
526+
std::fill(keyInit.begin(), keyInit.end(), 1);
527+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
528+
const TKSVSet rightSet({1});
529+
TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftSemi, leftFlow, rightSet);
530+
}
531+
532+
Y_UNIT_TEST(TestLeftOnlyOn1) {
533+
TVector<ui64> keyInit(testSize);
534+
std::fill(keyInit.begin(), keyInit.end(), 1);
535+
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
536+
const TKSVSet rightSet({1});
537+
TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftOnly, leftFlow, rightSet);
538+
}
539+
540+
} // Y_UNIT_TEST_SUITE
541+
473542
} // namespace NMiniKQL
474543
} // namespace NKikimr

ydb/library/yql/minikql/computation/mkql_block_impl.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ void TBlockState::ClearValues() {
343343
}
344344

345345
void TBlockState::FillArrays() {
346+
MKQL_ENSURE(Count == 0, "All existing arrays have to be processed");
346347
auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
347348
MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
348349
Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;

0 commit comments

Comments
 (0)