Skip to content

Commit 2e82c91

Browse files
Merge 5302382 into c808caf
2 parents c808caf + 5302382 commit 2e82c91

File tree

5 files changed

+31
-8
lines changed

5 files changed

+31
-8
lines changed

ydb/library/yql/minikql/aligned_page_pool.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ void TAlignedPagePoolImpl<T>::Free(void* ptr, size_t size) noexcept {
504504
template<typename T>
505505
void TAlignedPagePoolImpl<T>::UpdateMemoryYellowZone() {
506506
if (Limit == 0) return;
507-
if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
507+
// if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
508508

509509
ui8 usedMemoryPercent = 100 * GetUsed() / Limit;
510510
if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) {

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "mkql_grace_join.h"
22
#include "mkql_grace_join_imp.h"
33

4+
#include <format>
45
#include <ydb/library/yql/public/udf/udf_data_type.h>
56
#include <ydb/library/yql/public/udf/udf_value.h>
67
#include <ydb/library/yql/public/decimal/yql_decimal_serialize.h>
@@ -631,9 +632,9 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
631632
}
632633

633634
bool IsSwitchToSpillingModeCondition() const {
634-
return false;
635+
// return false;
635636
// TODO: YQL-18033
636-
// return !HasMemoryForProcessing();
637+
return !HasMemoryForProcessing();
637638
}
638639

639640
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
@@ -804,7 +805,10 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
804805
}
805806

806807
bool TryToReduceMemoryAndWait() {
808+
std::cerr << std::format("[MISHA][LEFT] Trying to reduce memory {}MB/{}MB\n", TlsAllocState->GetAllocated() / 1024 / 1024, TlsAllocState->GetLimit() / 1024 / 1024);
807809
bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait();
810+
if (isWaitingLeftForReduce) return true;
811+
std::cerr << std::format("[MISHA][RIGHT] Trying to reduce memory {}MB/{}MB\n", TlsAllocState->GetAllocated() / 1024 / 1024, TlsAllocState->GetLimit() / 1024 / 1024);
808812
bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait();
809813

810814
return isWaitingLeftForReduce || isWaitingRightForReduce;
@@ -862,6 +866,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
862866
UpdateSpilling();
863867
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
864868

869+
870+
865871
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
866872
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
867873
}
@@ -885,6 +891,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
885891
return EFetchResult::One;
886892
}
887893

894+
std::cerr << std::format("[MISHA] Bucket {} joined {}MB/{}MB\n", NextBucketToJoin, TlsAllocState->GetAllocated() / 1024 / 1024, TlsAllocState->GetLimit() / 1024 / 1024);
895+
888896
LeftPacker->TuplesBatchPacked = 0;
889897
LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
890898

@@ -895,6 +903,8 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
895903
JoinedTablePtr->ResetIterator();
896904
*PartialJoinCompleted = false;
897905

906+
std::cerr << std::format("[MISHA] Bucket {} cleared {}MB/{}MB\n", NextBucketToJoin, TlsAllocState->GetAllocated() / 1024 / 1024, TlsAllocState->GetLimit() / 1024 / 1024);
907+
898908
NextBucketToJoin++;
899909
} else {
900910
*PartialJoinCompleted = true;

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "mkql_grace_join_imp.h"
22

3+
#include <format>
34
#include <ydb/library/yql/public/udf/udf_data_type.h>
45
#include <ydb/library/yql/utils/log/log.h>
56

@@ -1113,12 +1114,19 @@ void TTable::Clear() {
11131114

11141115
void TTable::ClearBucket(ui64 bucket) {
11151116
TTableBucket & tb = TableBuckets[bucket];
1116-
tb.KeyIntVals.clear();
1117+
std::vector<ui64, TMKQLAllocator<ui64>>().swap(tb.KeyIntVals);
1118+
std::vector<ui64, TMKQLAllocator<ui64>>().swap(tb.DataIntVals);
1119+
std::vector<ui32, TMKQLAllocator<ui32>>().swap(tb.StringsOffsets);
1120+
std::vector<char, TMKQLAllocator<char>>().swap(tb.StringsValues);
1121+
std::vector<char, TMKQLAllocator<char>>().swap(tb.InterfaceValues);
1122+
std::vector<ui32, TMKQLAllocator<ui32>>().swap(tb.InterfaceOffsets);
1123+
1124+
/* tb.KeyIntVals.clear();
11171125
tb.DataIntVals.clear();
11181126
tb.StringsOffsets.clear();
11191127
tb.StringsValues.clear();
11201128
tb.InterfaceValues.clear();
1121-
tb.InterfaceOffsets.clear();
1129+
tb.InterfaceOffsets.clear();*/
11221130
tb.JoinIds.clear();
11231131
tb.RightIds.clear();
11241132

@@ -1147,8 +1155,10 @@ bool TTable::TryToReduceMemoryAndWait() {
11471155
i32 largestBucketIndex = 0;
11481156
ui64 largestBucketSize = 0;
11491157
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1150-
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
1151-
1158+
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) {
1159+
std::cerr << std::format("[MISHA] NOT spilling because of bucket {}\n", bucket);
1160+
return true;
1161+
}
11521162
ui64 bucketSize = GetSizeOfBucket(bucket);
11531163
if (bucketSize > largestBucketSize) {
11541164
largestBucketSize = bucketSize;
@@ -1157,6 +1167,8 @@ bool TTable::TryToReduceMemoryAndWait() {
11571167
}
11581168

11591169
if (!largestBucketSize) return false;
1170+
TotalSpilled += largestBucketSize;
1171+
std::cerr << std::format("[MISHA][{}MB] spilling {} of size {}KB\n", TotalSpilled / 1024 / 1024, largestBucketIndex, largestBucketSize / 1024);
11601172
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
11611173
TableBuckets[largestBucketIndex] = TTableBucket{};
11621174

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ class TTableBucketSpiller {
172172

173173
// Class which represents single table data stored in buckets
174174
class TTable {
175+
ui64 TotalSpilled = 0;
175176
ui64 NumberOfKeyIntColumns = 0; // Key int columns always first and padded to sizeof(ui64).
176177
ui64 NumberOfKeyStringColumns = 0; // String key columns go after key int columns
177178
ui64 NumberOfKeyIColumns = 0; // Number of interface - provided key columns

ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace NYql::NDqs {
3030
NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
3131
THashMap<TString, TString> ClusterNamesMapping;
3232

33-
ui64 MkqlInitialMemoryLimit = 8_GB;
33+
ui64 MkqlInitialMemoryLimit = 50_MB;
3434
ui64 MkqlTotalMemoryLimit = 0;
3535
ui64 MkqlMinAllocSize = 30_MB;
3636
ui64 MkqlProgramHardMemoryLimit = 0;

0 commit comments

Comments
 (0)