Skip to content

Commit 234e72e

Browse files
Merge fcbd4b9 into 3d8e652
2 parents 3d8e652 + fcbd4b9 commit 234e72e

File tree

7 files changed

+164
-49
lines changed

7 files changed

+164
-49
lines changed

ydb/library/yql/dq/actors/spilling/spilling_file.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct TFileSpillingServiceConfig {
1717
ui64 MaxFilePartSize = 0;
1818

1919
ui32 IoThreadPoolWorkersCount = 2;
20-
ui32 IoThreadPoolQueueSize = 1000;
20+
ui32 IoThreadPoolQueueSize = 100000;
2121
bool CleanupOnShutdown = false;
2222
};
2323

ydb/library/yql/minikql/aligned_page_pool.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ void TAlignedPagePoolImpl<T>::Free(void* ptr, size_t size) noexcept {
504504
template<typename T>
505505
void TAlignedPagePoolImpl<T>::UpdateMemoryYellowZone() {
506506
if (Limit == 0) return;
507-
if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
507+
// if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
508508

509509
ui8 usedMemoryPercent = 100 * GetUsed() / Limit;
510510
if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) {

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,10 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
596596
}
597597

598598
EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
599+
if (!IsLogPrinted) {
600+
std::cerr << "[MISHA] new join\n";
601+
IsLogPrinted = true;
602+
}
599603
while (true) {
600604
switch(GetMode()) {
601605
case EOperatingMode::InMemory: {
@@ -622,6 +626,8 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
622626
}
623627

624628
private:
629+
bool IsLogPrinted = false;
630+
625631
EOperatingMode GetMode() const {
626632
return Mode;
627633
}
@@ -631,13 +637,13 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
631637
}
632638

633639
bool IsSwitchToSpillingModeCondition() const {
634-
return false;
640+
// return false;
635641
// TODO: YQL-18033
636-
// return !HasMemoryForProcessing();
642+
return !HasMemoryForProcessing();
637643
}
638644

639-
640645
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
646+
std::cerr << std::format("[MISHA] changing state {} -> {}\n", (int)Mode, (int)mode);
641647
switch(mode) {
642648
case EOperatingMode::InMemory: {
643649
MKQL_ENSURE(false, "Internal logic error");
@@ -800,6 +806,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
800806
}
801807

802808
}
809+
std::cerr << "[MISHA] join finished\n";
803810

804811
return EFetchResult::Finish;
805812
}
@@ -816,18 +823,23 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
816823
RightPacker->TablePtr->UpdateSpilling();
817824
}
818825

819-
bool HasRunningAsyncOperation() const {
820-
return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation();
826+
827+
bool IsSpillingFinished() const {
828+
return LeftPacker->TablePtr->IsSpillingFinished() && RightPacker->TablePtr->IsSpillingFinished();
829+
}
830+
831+
bool IsReadyForSpilledDataProcessing() const {
832+
return LeftPacker->TablePtr->IsSpillingAcceptingDataRequests() && RightPacker->TablePtr->IsSpillingAcceptingDataRequests();
821833
}
822834

823-
bool IsProcessingFinished() {
824-
return LeftPacker->TablePtr->IsProcessingFinished() || RightPacker->TablePtr->IsProcessingFinished();
835+
bool IsRestoringSpilledBuckets() const {
836+
return LeftPacker->TablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets();
825837
}
826838

827839
void DoCalculateWithSpilling(TComputationContext& ctx) {
828840
UpdateSpilling();
829841

830-
if (!HasMemoryForProcessing()) {
842+
if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
831843
bool isWaitingForReduce = TryToReduceMemoryAndWait();
832844
if (isWaitingForReduce) return;
833845
}
@@ -838,15 +850,19 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
838850
}
839851

840852
if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
841-
UpdateSpilling();
842-
if (HasRunningAsyncOperation() || !IsProcessingFinished()) return;
853+
std::cerr << "[MISHA] everything fetched\n";
854+
if (!IsSpillingFinished()) return;
843855
if (!IsSpillingFinalized) {
844856
LeftPacker->TablePtr->FinalizeSpilling();
845857
RightPacker->TablePtr->FinalizeSpilling();
846858
IsSpillingFinalized = true;
847859

848-
if (HasRunningAsyncOperation()) return;
860+
UpdateSpilling();
849861
}
862+
if (!IsReadyForSpilledDataProcessing()) return;
863+
864+
LeftPacker->TablePtr->PrintSpillersState("LEFT");
865+
RightPacker->TablePtr->PrintSpillersState("RIGHT");
850866
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
851867
return;
852868
}
@@ -855,14 +871,25 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
855871
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
856872
while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
857873
UpdateSpilling();
874+
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
858875

859-
if (HasRunningAsyncOperation()) return EFetchResult::Yield;
876+
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
877+
std::cerr << std::format("[MISHA][LEFT] extracting bucket {}\n", NextBucketToJoin);
878+
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
879+
}
880+
881+
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
882+
std::cerr << std::format("[MISHA][RIGHT] extracting bucket {}\n", NextBucketToJoin);
883+
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
884+
}
860885

861886
if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
887+
std::cerr << std::format("[MISHA][LEFT] restoring bucket {}\n", NextBucketToJoin);
862888
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
863889
}
864890

865891
if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
892+
std::cerr << std::format("[MISHA][RIGHT] restoring bucket {}\n", NextBucketToJoin);
866893
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
867894
}
868895

@@ -886,8 +913,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
886913

887914
NextBucketToJoin++;
888915
} else {
889-
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
890-
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
916+
std::cerr << "[MISHA] joining\n";
891917
*PartialJoinCompleted = true;
892918
LeftPacker->StartTime = std::chrono::system_clock::now();
893919
RightPacker->StartTime = std::chrono::system_clock::now();

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11471147
i32 largestBucketIndex = 0;
11481148
ui64 largestBucketSize = 0;
11491149
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1150-
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation() || !TableBucketsSpillers[bucket].IsProcessingFinished()) return true;
1150+
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
11511151

11521152
ui64 bucketSize = GetSizeOfBucket(bucket);
11531153
if (bucketSize > largestBucketSize) {
@@ -1157,6 +1157,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11571157
}
11581158

11591159
if (!largestBucketSize) return false;
1160+
std::cerr << std::format("[MISHA] spilling {}\n", largestBucketIndex);
11601161
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
11611162
TableBuckets[largestBucketIndex] = TTableBucket{};
11621163

@@ -1169,14 +1170,44 @@ void TTable::UpdateSpilling() {
11691170
}
11701171
}
11711172

1173+
bool TTable::IsSpillingFinished() const {
1174+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1175+
if (TableBucketsSpillers[i].IsProcessingSpilling()) {
1176+
std::cerr << std::format("[MISHA] bucket {} still spilling : {}\n", i, TableBucketsSpillers[i].GetStateName());
1177+
return false;
1178+
}
1179+
}
1180+
return true;
1181+
}
1182+
1183+
bool TTable::IsSpillingAcceptingDataRequests() const {
1184+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1185+
if (TableBucketsSpillers[i].IsInMemory()) continue;
1186+
if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) {
1187+
std::cerr << std::format("[MISHA] bucket {} not accepting data requests: {}\n", i, TableBucketsSpillers[i].GetStateName());
1188+
return false;
1189+
}
1190+
}
1191+
return true;
1192+
}
1193+
1194+
bool TTable::IsRestoringSpilledBuckets() const {
1195+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1196+
if (TableBucketsSpillers[i].IsRestoring()) return true;
1197+
}
1198+
return false;
1199+
}
1200+
11721201
void TTable::FinalizeSpilling() {
11731202
MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error");
11741203

11751204
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
11761205
if (!TableBucketsSpillers[bucket].IsInMemory()) {
1206+
TableBucketsSpillers[bucket].Finalize();
1207+
std::cerr << std::format("[MISHA] finalizing {}\n", bucket);
11771208
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
11781209
TableBuckets[bucket] = TTableBucket{};
1179-
TableBucketsSpillers[bucket].Finalize();
1210+
11801211
}
11811212
}
11821213
}
@@ -1188,20 +1219,16 @@ bool TTable::HasRunningAsyncIoOperation() const {
11881219
return false;
11891220
}
11901221

1191-
bool TTable::IsProcessingFinished() const {
1192-
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1193-
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return false;
1194-
}
1195-
return true;
1196-
}
1197-
11981222
bool TTable::IsBucketInMemory(ui32 bucket) const {
11991223
return TableBucketsSpillers[bucket].IsInMemory();
12001224
}
12011225

1226+
bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
1227+
return TableBucketsSpillers[bucket].IsExtractionRequired();
1228+
}
1229+
12021230
void TTable::StartLoadingBucket(ui32 bucket) {
12031231
MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
1204-
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return;
12051232

12061233
TableBucketsSpillers[bucket].StartBucketRestoration();
12071234
}
@@ -1280,19 +1307,20 @@ void TTableBucketSpiller::Update() {
12801307

12811308
if (State == EState::Spilling) {
12821309
ProcessBucketSpilling();
1310+
} else if (State == EState::Finalizing) {
1311+
ProcessFinalizing();
12831312
} else if (State == EState::Restoring) {
12841313
ProcessBucketRestoration();
12851314
}
12861315
}
12871316

12881317
void TTableBucketSpiller::Finalize() {
1289-
IsFinalizing = true;
1318+
IsFinalizingRequested = true;
12901319
}
12911320

12921321
void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12931322
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
12941323
State = EState::Spilling;
1295-
IsBucketOwnedBySpiller = true;
12961324

12971325
CurrentBucket = std::move(bucket);
12981326
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
@@ -1301,9 +1329,9 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
13011329
}
13021330

13031331
TTableBucket&& TTableBucketSpiller::ExtractBucket() {
1304-
MKQL_ENSURE(State == EState::InMemory, "Internal logic error");
1332+
MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error");
13051333
MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
1306-
IsBucketOwnedBySpiller = false;
1334+
State = EState::InMemory;
13071335
return std::move(CurrentBucket);
13081336
}
13091337

@@ -1318,14 +1346,16 @@ bool TTableBucketSpiller::IsInMemory() const {
13181346
}
13191347

13201348
bool TTableBucketSpiller::IsExtractionRequired() const {
1321-
return IsBucketOwnedBySpiller;
1349+
return State == EState::WaitingForExtraction;
13221350
}
13231351

13241352
void TTableBucketSpiller::StartBucketRestoration() {
1325-
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
1326-
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
1353+
MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("STATE: {}\n", (int)State));
1354+
if (NextVectorToProcess != ENextVectorToProcess::None) return;
1355+
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, std::format("NEXT VECTOR: {}\n", (int)NextVectorToProcess));
13271356

13281357
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1358+
State = EState::Restoring;
13291359
ProcessBucketRestoration();
13301360
}
13311361

@@ -1374,15 +1404,24 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
13741404
return;
13751405
}
13761406
}
1377-
if (!HasRunningAsyncIoOperation() && IsFinalizing) {
13781407

1408+
if (IsFinalizingRequested) {
1409+
if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
1410+
std::cerr << "[MISHA] actually finalizing\n";
1411+
State = EState::Finalizing;
13791412
StateUi64Adapter.Finalize();
13801413
StateUi32Adapter.Finalize();
13811414
StateCharAdapter.Finalize();
13821415

1383-
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1384-
State = EState::Restoring;
1385-
}
1416+
ProcessFinalizing();
1417+
return;
1418+
}
1419+
State = EState::AcceptingData;
1420+
}
1421+
1422+
void TTableBucketSpiller::ProcessFinalizing() {
1423+
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1424+
State = EState::AcceptingDataRequests;
13861425
}
13871426
}
13881427

@@ -1460,7 +1499,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
14601499
SpilledBucketsCount--;
14611500
if (SpilledBucketsCount == 0) {
14621501
NextVectorToProcess = ENextVectorToProcess::None;
1463-
State = EState::InMemory;
1502+
State = EState::WaitingForExtraction;
14641503
} else {
14651504
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
14661505
}

0 commit comments

Comments
 (0)