Skip to content

Commit fcbd4b9

Browse files
committed
fixed all tests
1 parent a6eed35 commit fcbd4b9

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

ydb/library/yql/dq/actors/spilling/spilling_file.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct TFileSpillingServiceConfig {
1717
ui64 MaxFilePartSize = 0;
1818

1919
ui32 IoThreadPoolWorkersCount = 2;
20-
ui32 IoThreadPoolQueueSize = 1000;
20+
ui32 IoThreadPoolQueueSize = 100000;
2121
bool CleanupOnShutdown = false;
2222
};
2323

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,10 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
596596
}
597597

598598
EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
599+
if (!IsLogPrinted) {
600+
std::cerr << "[MISHA] new join\n";
601+
IsLogPrinted = true;
602+
}
599603
while (true) {
600604
switch(GetMode()) {
601605
case EOperatingMode::InMemory: {
@@ -622,6 +626,8 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
622626
}
623627

624628
private:
629+
bool IsLogPrinted = false;
630+
625631
EOperatingMode GetMode() const {
626632
return Mode;
627633
}
@@ -800,6 +806,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
800806
}
801807

802808
}
809+
std::cerr << "[MISHA] join finished\n";
803810

804811
return EFetchResult::Finish;
805812
}
@@ -832,7 +839,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
832839
void DoCalculateWithSpilling(TComputationContext& ctx) {
833840
UpdateSpilling();
834841

835-
if (!HasMemoryForProcessing()) {
842+
if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
836843
bool isWaitingForReduce = TryToReduceMemoryAndWait();
837844
if (isWaitingForReduce) return;
838845
}
@@ -906,6 +913,7 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
906913

907914
NextBucketToJoin++;
908915
} else {
916+
std::cerr << "[MISHA] joining\n";
909917
*PartialJoinCompleted = true;
910918
LeftPacker->StartTime = std::chrono::system_clock::now();
911919
RightPacker->StartTime = std::chrono::system_clock::now();

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,10 @@ void TTable::UpdateSpilling() {
11721172

11731173
bool TTable::IsSpillingFinished() const {
11741174
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1175-
if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
1175+
if (TableBucketsSpillers[i].IsProcessingSpilling()) {
1176+
std::cerr << std::format("[MISHA] bucket {} still spilling : {}\n", i, TableBucketsSpillers[i].GetStateName());
1177+
return false;
1178+
}
11761179
}
11771180
return true;
11781181
}

0 commit comments

Comments
 (0)