Skip to content

Commit 451d38c

Browse files
Merge 8167088 into 893f325
2 parents 893f325 + 8167088 commit 451d38c

File tree

4 files changed

+173
-51
lines changed

4 files changed

+173
-51
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
521521

522522
if (finishedCount != SpilledBuckets.size()) return true;
523523

524-
YQL_LOG(INFO) << "switching to ProcessSpilled";
525524
SwitchMode(EOperatingMode::ProcessSpilled);
526525

527526
return ProcessSpilledDataAndWait();
@@ -551,11 +550,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
551550

552551
bool CheckMemoryAndSwitchToSpilling() {
553552
if (AllowSpilling && Ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
554-
const auto used = TlsAllocState->GetUsed();
555-
const auto limit = TlsAllocState->GetLimit();
556-
557-
YQL_LOG(INFO) << "yellow zone reached " << (used*100/limit) << "%=" << used << "/" << limit;
558-
YQL_LOG(INFO) << "switching Memory mode to Spilling";
553+
LogMemoryUsage();
559554

560555
SwitchMode(EOperatingMode::Spilling);
561556
return true;
@@ -564,6 +559,19 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
564559
return false;
565560
}
566561

562+
void LogMemoryUsage() const {
563+
const auto used = TlsAllocState->GetUsed();
564+
const auto limit = TlsAllocState->GetLimit();
565+
TStringBuilder logmsg;
566+
logmsg << "Memory usage: ";
567+
if (limit) {
568+
logmsg << (used*100/limit) << "% =";
569+
}
570+
logmsg << (used/1_MB) << "MB/" << (limit/1_MB) << "MB";
571+
572+
YQL_LOG(INFO) << logmsg;
573+
}
574+
567575
void SpillMoreStateFromBucket(TSpilledBucket& bucket) {
568576
MKQL_ENSURE(!bucket.AsyncWriteOperation.has_value(), "Internal logic error");
569577

@@ -688,10 +696,12 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
688696
void SwitchMode(EOperatingMode mode) {
689697
switch(mode) {
690698
case EOperatingMode::InMemory: {
699+
YQL_LOG(INFO) << "switching Memory mode to InMemory";
691700
MKQL_ENSURE(false, "Internal logic error");
692701
break;
693702
}
694703
case EOperatingMode::Spilling: {
704+
YQL_LOG(INFO) << "switching Memory mode to Spilling";
695705
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
696706
SpilledBuckets.resize(SpilledBucketCount);
697707
auto spiller = Ctx.SpillerFactory->CreateSpiller();
@@ -707,6 +717,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
707717
break;
708718
}
709719
case EOperatingMode::ProcessSpilled: {
720+
YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
710721
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
711722
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
712723
BufferForKeyAndState.resize(0);
@@ -722,9 +733,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
722733
}
723734

724735
bool IsSwitchToSpillingModeCondition() const {
725-
return false;
726-
// TODO: YQL-18033
727-
// return !HasMemoryForProcessing();
736+
return !HasMemoryForProcessing();
728737
}
729738

730739
public:
@@ -1250,7 +1259,6 @@ using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TWideLastCombinerWra
12501259
, AllowSpilling(allowSpilling)
12511260
{}
12521261

1253-
// MARK: DoCAlculate
12541262
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
12551263
if (!state.HasValue()) {
12561264
MakeState(ctx, state);

ydb/library/yql/minikql/comp_nodes/ut/mkql_computation_node_ut.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
void N(NUnitTest::TTestContext&)
4949

5050
#define Y_UNIT_TEST_LLVM(N) Y_UNIT_TEST_TWIN(N, LLVM)
51+
#define Y_UNIT_TEST_LLVM_SPILLING(N) Y_UNIT_TEST_QUAD(N, LLVM, SPILLING)
5152

5253
#define Y_UNIT_TEST_QUAD(N, OPT1, OPT2) \
5354
template<bool OPT1, bool OPT2> void N(NUnitTest::TTestContext&); \
@@ -79,7 +80,7 @@ struct TUdfModuleInfo {
7980
NUdf::TUniquePtr<NUdf::IUdfModule> Module;
8081
};
8182

82-
template<bool UseLLVM>
83+
template<bool UseLLVM, bool EnableSpilling = false>
8384
struct TSetup {
8485
explicit TSetup(TComputationNodeFactory nodeFactory = GetTestFactory(), TVector<TUdfModuleInfo>&& modules = {})
8586
: Alloc(__LOCATION__)
@@ -96,6 +97,8 @@ struct TSetup {
9697
FunctionRegistry = mutableRegistry;
9798
}
9899

100+
Alloc.Ref().ForcefullySetMemoryYellowZone(EnableSpilling);
101+
99102
RandomProvider = CreateDeterministicRandomProvider(1);
100103
TimeProvider = CreateDeterministicTimeProvider(10000000);
101104

ydb/library/yql/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp

Lines changed: 133 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
#include <ydb/library/yql/minikql/mkql_string_util.h>
66

77
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
8+
#include <ydb/library/yql/minikql/computation/mock_spiller_factory_ut.h>
89

910
#include <cstring>
10-
#include <random>
1111
#include <algorithm>
1212

1313
namespace NKikimr {
@@ -30,6 +30,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
3030
{}
3131
private:
3232
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
33+
3334
constexpr auto size = Y_ARRAY_SIZE(g_TestYieldStreamData);
3435
if (Index == size) {
3536
return NUdf::EFetchStatus::Finish;
@@ -47,6 +48,7 @@ using TBaseComputation = TMutableComputationNode<TTestStreamWrapper>;
4748
items[1] = NUdf::TUnboxedValuePod(MakeString(ToString(val)));
4849

4950
++Index;
51+
5052
return NUdf::EFetchStatus::Ok;
5153
}
5254

@@ -996,8 +998,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
996998
#endif
997999
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u
9981000
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
999-
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
1000-
TSetup<LLVM> setup;
1001+
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) {
1002+
// Currently LLVM version doesn't support spilling.
1003+
if (LLVM && SPILLING) return;
1004+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1005+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1006+
1007+
TSetup<LLVM, SPILLING> setup;
10011008
TProgramBuilder& pb = *setup.PgmBuilder;
10021009

10031010
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1035,7 +1042,12 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10351042

10361043
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
10371044

1038-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1045+
auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1046+
if (SPILLING) {
1047+
wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1048+
}
1049+
1050+
const auto pgmReturn = pb.Collect(pb.NarrowMap((pb.*wideLastCombinerCollable)(pb.ExpandMap(pb.ToFlow(list),
10391051
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
10401052
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
10411053
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1059,22 +1071,37 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
10591071
));
10601072

10611073
const auto graph = setup.BuildGraph(pgmReturn);
1074+
if (SPILLING) {
1075+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1076+
}
10621077
const auto iterator = graph->GetValue().GetListIterator();
1078+
1079+
std::unordered_set<TString> expected {
1080+
"key one",
1081+
"very long value 2 / key two",
1082+
"very long key one",
1083+
"very long value 8 / very long value 7 / very long value 6"
1084+
};
1085+
10631086
NUdf::TUnboxedValue item;
1064-
UNIT_ASSERT(iterator.Next(item));
1065-
UNBOXED_VALUE_STR_EQUAL(item, "key one");
1066-
UNIT_ASSERT(iterator.Next(item));
1067-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 2 / key two");
1068-
UNIT_ASSERT(iterator.Next(item));
1069-
UNBOXED_VALUE_STR_EQUAL(item, "very long key one");
1070-
UNIT_ASSERT(iterator.Next(item));
1071-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 8 / very long value 7 / very long value 6");
1087+
while (!expected.empty()) {
1088+
UNIT_ASSERT(iterator.Next(item));
1089+
TString actual = item.AsStringRef().Data();
1090+
1091+
auto it = expected.find(actual);
1092+
UNIT_ASSERT(it != expected.end());
1093+
expected.erase(it);
1094+
}
10721095
UNIT_ASSERT(!iterator.Next(item));
10731096
UNIT_ASSERT(!iterator.Next(item));
10741097
}
10751098

1076-
Y_UNIT_TEST_LLVM(TestLongStringsPasstroughtRefCounting) {
1077-
TSetup<LLVM> setup;
1099+
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsPasstroughtRefCounting) {
1100+
// Currently LLVM version doesn't support spilling.
1101+
if (LLVM && SPILLING) return;
1102+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1103+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1104+
TSetup<LLVM, SPILLING> setup;
10781105
TProgramBuilder& pb = *setup.PgmBuilder;
10791106

10801107
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1111,7 +1138,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11111138

11121139
const auto list = pb.NewList(tupleType, {data1, data2, data3, data4, data5, data6, data7, data8, data9});
11131140

1114-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1141+
auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1142+
if (SPILLING) {
1143+
wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1144+
}
1145+
const auto pgmReturn = pb.Collect(pb.NarrowMap((pb.*wideLastCombinerCollable)(pb.ExpandMap(pb.ToFlow(list),
11151146
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }),
11161147
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
11171148
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1134,22 +1165,40 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11341165
));
11351166

11361167
const auto graph = setup.BuildGraph(pgmReturn);
1168+
if (SPILLING) {
1169+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1170+
}
11371171
const auto iterator = graph->GetValue().GetListIterator();
1172+
1173+
std::unordered_set<TString> expected {
1174+
"very long value 1 / key one / very long value 1 / key one",
1175+
"very long value 3 / key two / very long value 2 / key two",
1176+
"very long value 4 / very long key one / very long value 4 / very long key one",
1177+
"very long value 9 / very long key two / very long value 5 / very long key two"
1178+
};
1179+
11381180
NUdf::TUnboxedValue item;
1139-
UNIT_ASSERT(iterator.Next(item));
1140-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 1 / key one / very long value 1 / key one");
1141-
UNIT_ASSERT(iterator.Next(item));
1142-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 3 / key two / very long value 2 / key two");
1143-
UNIT_ASSERT(iterator.Next(item));
1144-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 4 / very long key one / very long value 4 / very long key one");
1145-
UNIT_ASSERT(iterator.Next(item));
1146-
UNBOXED_VALUE_STR_EQUAL(item, "very long value 9 / very long key two / very long value 5 / very long key two");
1181+
while (!expected.empty()) {
1182+
UNIT_ASSERT(iterator.Next(item));
1183+
TString actual = item.AsStringRef().Data();
1184+
1185+
auto it = expected.find(actual);
1186+
UNIT_ASSERT(it != expected.end());
1187+
expected.erase(it);
1188+
}
11471189
UNIT_ASSERT(!iterator.Next(item));
11481190
UNIT_ASSERT(!iterator.Next(item));
11491191
}
11501192

1151-
Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedInput) {
1152-
TSetup<LLVM> setup;
1193+
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedInput) {
1194+
// Test is broken. Remove this if after YQL-18808.
1195+
if (SPILLING) return;
1196+
1197+
// Currently LLVM version doesn't support spilling.
1198+
if (LLVM && SPILLING) return;
1199+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1200+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1201+
TSetup<LLVM, SPILLING> setup;
11531202
TProgramBuilder& pb = *setup.PgmBuilder;
11541203

11551204
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1183,7 +1232,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
11831232

11841233
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
11851234

1186-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1235+
auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1236+
if (SPILLING) {
1237+
wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1238+
}
1239+
const auto pgmReturn = pb.Collect(pb.NarrowMap((pb.*wideLastCombinerCollable)(pb.ExpandMap(pb.ToFlow(list),
11871240
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Unwrap(pb.Nth(item, 1U), landmine, __FILE__, __LINE__, 0), pb.Nth(item, 2U)}; }),
11881241
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
11891242
[&](TRuntimeNode::TList keys, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1207,18 +1260,34 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12071260
));
12081261

12091262
const auto graph = setup.BuildGraph(pgmReturn);
1263+
if (SPILLING) {
1264+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1265+
}
1266+
std::unordered_set<TString> expected {
1267+
"key one / value 2 / value 1 / value 5 / value 4",
1268+
"key two / value 4 / value 3 / value 3 / value 2"
1269+
};
1270+
12101271
const auto iterator = graph->GetValue().GetListIterator();
12111272
NUdf::TUnboxedValue item;
1212-
UNIT_ASSERT(iterator.Next(item));
1213-
UNBOXED_VALUE_STR_EQUAL(item, "key one / value 2 / value 1 / value 5 / value 4");
1214-
UNIT_ASSERT(iterator.Next(item));
1215-
UNBOXED_VALUE_STR_EQUAL(item, "key two / value 4 / value 3 / value 3 / value 2");
1273+
while (!expected.empty()) {
1274+
UNIT_ASSERT(iterator.Next(item));
1275+
TString actual = item.AsStringRef().Data();
1276+
1277+
auto it = expected.find(actual);
1278+
UNIT_ASSERT(it != expected.end());
1279+
expected.erase(it);
1280+
}
12161281
UNIT_ASSERT(!iterator.Next(item));
12171282
UNIT_ASSERT(!iterator.Next(item));
12181283
}
12191284

1220-
Y_UNIT_TEST_LLVM(TestDoNotCalculateUnusedOutput) {
1221-
TSetup<LLVM> setup;
1285+
Y_UNIT_TEST_LLVM_SPILLING(TestDoNotCalculateUnusedOutput) {
1286+
// Currently LLVM version doesn't support spilling.
1287+
if (LLVM && SPILLING) return;
1288+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1289+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1290+
TSetup<LLVM, SPILLING> setup;
12221291
TProgramBuilder& pb = *setup.PgmBuilder;
12231292

12241293
const auto dataType = pb.NewDataType(NUdf::TDataType<const char*>::Id);
@@ -1252,7 +1321,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12521321

12531322
const auto landmine = pb.NewDataLiteral<NUdf::EDataSlot::String>("ACHTUNG MINEN!");
12541323

1255-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1324+
auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1325+
if (SPILLING) {
1326+
wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1327+
}
1328+
const auto pgmReturn = pb.Collect(pb.NarrowMap((pb.*wideLastCombinerCollable)(pb.ExpandMap(pb.ToFlow(list),
12561329
[&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U)}; }),
12571330
[&](TRuntimeNode::TList items) -> TRuntimeNode::TList { return {items.front()}; },
12581331
[&](TRuntimeNode::TList, TRuntimeNode::TList items) -> TRuntimeNode::TList {
@@ -1268,26 +1341,46 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
12681341
));
12691342

12701343
const auto graph = setup.BuildGraph(pgmReturn);
1344+
if (SPILLING) {
1345+
graph->GetContext().SpillerFactory = std::make_shared<TMockSpillerFactory>();
1346+
}
1347+
std::unordered_set<TString> expected {
1348+
"key one: value 1, value 4, value 5, value 1, value 2",
1349+
"key two: value 2, value 3, value 3, value 4"
1350+
};
1351+
12711352
const auto iterator = graph->GetValue().GetListIterator();
12721353
NUdf::TUnboxedValue item;
1273-
UNIT_ASSERT(iterator.Next(item));
1274-
UNBOXED_VALUE_STR_EQUAL(item, "key one: value 1, value 4, value 5, value 1, value 2");
1275-
UNIT_ASSERT(iterator.Next(item));
1276-
UNBOXED_VALUE_STR_EQUAL(item, "key two: value 2, value 3, value 3, value 4");
1354+
while (!expected.empty()) {
1355+
UNIT_ASSERT(iterator.Next(item));
1356+
TString actual = item.AsStringRef().Data();
1357+
1358+
auto it = expected.find(actual);
1359+
UNIT_ASSERT(it != expected.end());
1360+
expected.erase(it);
1361+
}
12771362
UNIT_ASSERT(!iterator.Next(item));
12781363
UNIT_ASSERT(!iterator.Next(item));
12791364
}
12801365

1281-
Y_UNIT_TEST_LLVM(TestThinAllLambdas) {
1282-
TSetup<LLVM> setup;
1366+
Y_UNIT_TEST_LLVM_SPILLING(TestThinAllLambdas) {
1367+
// Currently LLVM version doesn't support spilling.
1368+
if (LLVM && SPILLING) return;
1369+
// callable WideLastCombinerWithSpilling was introduced in 49 version of runtime
1370+
if (MKQL_RUNTIME_VERSION < 49U && SPILLING) return;
1371+
TSetup<LLVM, SPILLING> setup;
12831372
TProgramBuilder& pb = *setup.PgmBuilder;
12841373

12851374
const auto tupleType = pb.NewTupleType({});
12861375
const auto data = pb.NewTuple({});
12871376

12881377
const auto list = pb.NewList(tupleType, {data, data, data, data});
12891378

1290-
const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.WideLastCombiner(pb.ExpandMap(pb.ToFlow(list),
1379+
auto wideLastCombinerCollable = &TProgramBuilder::WideLastCombiner;
1380+
if (SPILLING) {
1381+
wideLastCombinerCollable = &TProgramBuilder::WideLastCombinerWithSpilling;
1382+
}
1383+
const auto pgmReturn = pb.Collect(pb.NarrowMap((pb.*wideLastCombinerCollable)(pb.ExpandMap(pb.ToFlow(list),
12911384
[](TRuntimeNode) -> TRuntimeNode::TList { return {}; }),
12921385
[](TRuntimeNode::TList items) { return items; },
12931386
[](TRuntimeNode::TList, TRuntimeNode::TList items) { return items; },

0 commit comments

Comments
 (0)