Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions ydb/library/yql/minikql/comp_nodes/mkql_block_map_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ class TBlockJoinState : public TBlockState {
return true;
}

bool HasBlocks() {
return Count > 0;
}

bool IsNotFull() const {
return OutputRows_ < MaxLength_
&& BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
Expand Down Expand Up @@ -190,7 +194,7 @@ using TState = TBlockJoinState<RightRequired>;
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
const auto dict = Dict_->GetValue(ctx);

do {
while (!blockState.HasBlocks()) {
while (blockState.IsNotFull() && blockState.NextRow()) {
const auto key = MakeKeysTuple(ctx, blockState, LeftKeyColumns_);
if constexpr (WithoutRight) {
Expand All @@ -205,7 +209,7 @@ using TState = TBlockJoinState<RightRequired>;
blockState.MakeRow(dict.Lookup(key));
}
}
if (!blockState.IsFinished()) {
if (blockState.IsNotFull() && !blockState.IsFinished()) {
switch (Flow_->FetchValues(ctx, fields)) {
case EFetchResult::Yield:
return EFetchResult::Yield;
Expand All @@ -216,16 +220,15 @@ using TState = TBlockJoinState<RightRequired>;
blockState.Finish();
break;
}
// Leave the loop, if no values left in the flow.
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
}
// Leave the outer loop, if no values left in the flow.
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
break;
} while (true);

if (blockState.IsEmpty()) {
return EFetchResult::Finish;
if (blockState.IsEmpty()) {
return EFetchResult::Finish;
}
blockState.MakeBlocks(ctx.HolderFactory);
}
blockState.MakeBlocks(ctx.HolderFactory);

const auto sliceSize = blockState.Slice();

for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
Expand Down Expand Up @@ -294,7 +297,7 @@ using TState = TBlockJoinState<RightRequired>;
auto** fields = ctx.WideFields.data() + WideFieldsIndex_;
const auto dict = Dict_->GetValue(ctx);

do {
while (!blockState.HasBlocks()) {
if (iterState) {
NUdf::TUnboxedValue lookupItem;
// Process the remaining items from the iterator.
Expand Down Expand Up @@ -329,15 +332,13 @@ using TState = TBlockJoinState<RightRequired>;
}
// Leave the loop, if no values left in the flow.
Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
break;
}
break;
} while(true);

if (blockState.IsEmpty()) {
return EFetchResult::Finish;
if (blockState.IsEmpty()) {
return EFetchResult::Finish;
}
blockState.MakeBlocks(ctx.HolderFactory);
}
blockState.MakeBlocks(ctx.HolderFactory);

const auto sliceSize = blockState.Slice();

for (size_t i = 0; i < ResultJoinItems_.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,5 +470,74 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) {

} // Y_UNIT_TEST_SUITE

Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) {

constexpr size_t testSize = 1 << 14;
constexpr size_t valueSize = 3;
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
static const TString hugeString(128, '1');

const TVector<TKSV> MakeFillTKSV(const TVector<ui64>& keyInit,
const ui64 subkeyMultiplier, const TVector<TString>& valuePayload
) {
TVector<TKSV> testKSV;
for (size_t i = 0; i < keyInit.size(); i++) {
testKSV.push_back(std::make_tuple(keyInit[i],
keyInit[i] * subkeyMultiplier,
valuePayload[i]));
}
return testKSV;
}

Y_UNIT_TEST(TestInnerOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
TKSWMap rightMap = {{1, hugeString}};
TestBlockJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMap);
}

Y_UNIT_TEST(TestInnerMultiOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
TestBlockMultiJoinWithRightOnUint64(EJoinKind::Inner, leftFlow, rightMultiMap);
}

Y_UNIT_TEST(TestLeftOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
TKSWMap rightMap = {{1, hugeString}};
TestBlockJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMap);;
}

Y_UNIT_TEST(TestLeftMultiOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
TKSWMultiMap rightMultiMap = {{1, {"1", hugeString}}};
TestBlockMultiJoinWithRightOnUint64(EJoinKind::Left, leftFlow, rightMultiMap);
}

Y_UNIT_TEST(TestLeftSemiOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
const TKSVSet rightSet({1});
TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftSemi, leftFlow, rightSet);
}

Y_UNIT_TEST(TestLeftOnlyOn1) {
TVector<ui64> keyInit(testSize);
std::fill(keyInit.begin(), keyInit.end(), 1);
const auto leftFlow = MakeFillTKSV(keyInit, 1001, threeLetterValues);
const TKSVSet rightSet({1});
TestBlockJoinWithoutRightOnUint64(EJoinKind::LeftOnly, leftFlow, rightSet);
}

} // Y_UNIT_TEST_SUITE

} // namespace NMiniKQL
} // namespace NKikimr
1 change: 1 addition & 0 deletions ydb/library/yql/minikql/computation/mkql_block_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ void TBlockState::ClearValues() {
}

void TBlockState::FillArrays() {
MKQL_ENSURE(Count == 0, "All existing arrays have to be processed");
auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;
Expand Down