Skip to content

Commit 0b0bdf5

Browse files
committed
refactoring
1 parent 45c8bad commit 0b0bdf5

File tree

4 files changed

+148
-28
lines changed

4 files changed

+148
-28
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,6 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
636636
// return !HasMemoryForProcessing();
637637
}
638638

639-
640639
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
641640
switch(mode) {
642641
case EOperatingMode::InMemory: {
@@ -816,8 +815,17 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
816815
RightPacker->TablePtr->UpdateSpilling();
817816
}
818817

819-
bool HasRunningAsyncOperation() const {
820-
return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation();
818+
819+
bool IsSpillingFinished() const {
820+
return LeftPacker->TablePtr->IsSpillingFinished() && RightPacker->TablePtr->IsSpillingFinished();
821+
}
822+
823+
bool IsReadyForSpilledDataProcessing() const {
824+
return LeftPacker->TablePtr->IsSpillingAcceptingDataRequests() && RightPacker->TablePtr->IsSpillingAcceptingDataRequests();
825+
}
826+
827+
bool IsRestoringSpilledBuckets() const {
828+
return LeftPacker->TablePtr->IsRestoringSpilledBuckets() && RightPacker->TablePtr->IsRestoringSpilledBuckets();
821829
}
822830

823831
void DoCalculateWithSpilling(TComputationContext& ctx) {
@@ -834,15 +842,20 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
834842
}
835843

836844
if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
837-
UpdateSpilling();
838-
if (HasRunningAsyncOperation()) return;
845+
std::cerr << "[MISHA] everything fetched\n";
846+
if (!IsSpillingFinished()) return;
847+
839848
if (!IsSpillingFinalized) {
840849
LeftPacker->TablePtr->FinalizeSpilling();
841850
RightPacker->TablePtr->FinalizeSpilling();
842851
IsSpillingFinalized = true;
843852

844-
if (HasRunningAsyncOperation()) return;
853+
UpdateSpilling();
845854
}
855+
if (!IsReadyForSpilledDataProcessing()) return;
856+
857+
LeftPacker->TablePtr->PrintSpillersState("LEFT");
858+
RightPacker->TablePtr->PrintSpillersState("RIGHT");
846859
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
847860
return;
848861
}
@@ -851,14 +864,25 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
851864
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
852865
while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
853866
UpdateSpilling();
867+
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
854868

855-
if (HasRunningAsyncOperation()) return EFetchResult::Yield;
869+
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
870+
std::cerr << std::format("[MISHA][LEFT] extracting bucket {}\n", NextBucketToJoin);
871+
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
872+
}
873+
874+
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
875+
std::cerr << std::format("[MISHA][RIGHT] extracting bucket {}\n", NextBucketToJoin);
876+
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
877+
}
856878

857879
if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
880+
std::cerr << std::format("[MISHA][LEFT] restoring bucket {}\n", NextBucketToJoin);
858881
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
859882
}
860883

861884
if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
885+
std::cerr << std::format("[MISHA][RIGHT] restoring bucket {}\n", NextBucketToJoin);
862886
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
863887
}
864888

@@ -882,8 +906,6 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
882906

883907
NextBucketToJoin++;
884908
} else {
885-
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
886-
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
887909
*PartialJoinCompleted = true;
888910
LeftPacker->StartTime = std::chrono::system_clock::now();
889911
RightPacker->StartTime = std::chrono::system_clock::now();

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11471147
i32 largestBucketIndex = 0;
11481148
ui64 largestBucketSize = 0;
11491149
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1150-
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true;
1150+
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
11511151

11521152
ui64 bucketSize = GetSizeOfBucket(bucket);
11531153
if (bucketSize > largestBucketSize) {
@@ -1157,6 +1157,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11571157
}
11581158

11591159
if (!largestBucketSize) return false;
1160+
std::cerr << std::format("[MISHA] spilling {}\n", largestBucketIndex);
11601161
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
11611162
TableBuckets[largestBucketIndex] = TTableBucket{};
11621163

@@ -1169,11 +1170,36 @@ void TTable::UpdateSpilling() {
11691170
}
11701171
}
11711172

1173+
bool TTable::IsSpillingFinished() const {
1174+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1175+
if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
1176+
}
1177+
return true;
1178+
}
1179+
1180+
bool TTable::IsSpillingAcceptingDataRequests() const {
1181+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1182+
if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) {
1183+
std::cerr << std::format("[MISHA] bucket {} not accepting data requests\n", i);
1184+
return false;
1185+
}
1186+
}
1187+
return true;
1188+
}
1189+
1190+
bool TTable::IsRestoringSpilledBuckets() const {
1191+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1192+
if (TableBucketsSpillers[i].IsRestoring()) return true;
1193+
}
1194+
return false;
1195+
}
1196+
11721197
void TTable::FinalizeSpilling() {
11731198
MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error");
11741199

11751200
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
11761201
if (!TableBucketsSpillers[bucket].IsInMemory()) {
1202+
std::cerr << std::format("[MISHA] finalizing {}\n", bucket);
11771203
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
11781204
TableBuckets[bucket] = TTableBucket{};
11791205
TableBucketsSpillers[bucket].Finalize();
@@ -1192,6 +1218,10 @@ bool TTable::IsBucketInMemory(ui32 bucket) const {
11921218
return TableBucketsSpillers[bucket].IsInMemory();
11931219
}
11941220

1221+
bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
1222+
return TableBucketsSpillers[bucket].IsExtractionRequired();
1223+
}
1224+
11951225
void TTable::StartLoadingBucket(ui32 bucket) {
11961226
MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
11971227

@@ -1272,19 +1302,20 @@ void TTableBucketSpiller::Update() {
12721302

12731303
if (State == EState::Spilling) {
12741304
ProcessBucketSpilling();
1305+
} else if (State == EState::Finalizing) {
1306+
ProcessFinalizing();
12751307
} else if (State == EState::Restoring) {
12761308
ProcessBucketRestoration();
12771309
}
12781310
}
12791311

12801312
void TTableBucketSpiller::Finalize() {
1281-
IsFinalizing = true;
1313+
IsFinalizingRequested = true;
12821314
}
12831315

12841316
void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12851317
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
12861318
State = EState::Spilling;
1287-
IsBucketOwnedBySpiller = true;
12881319

12891320
CurrentBucket = std::move(bucket);
12901321
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
@@ -1295,7 +1326,7 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12951326
TTableBucket&& TTableBucketSpiller::ExtractBucket() {
12961327
MKQL_ENSURE(State == EState::InMemory, "Internal logic error");
12971328
MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
1298-
IsBucketOwnedBySpiller = false;
1329+
State = EState::InMemory;
12991330
return std::move(CurrentBucket);
13001331
}
13011332

@@ -1310,14 +1341,16 @@ bool TTableBucketSpiller::IsInMemory() const {
13101341
}
13111342

13121343
bool TTableBucketSpiller::IsExtractionRequired() const {
1313-
return IsBucketOwnedBySpiller;
1344+
return State == EState::WaitingForExtraction;
13141345
}
13151346

13161347
void TTableBucketSpiller::StartBucketRestoration() {
1317-
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
1318-
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
1348+
MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("STATE: {}\n", (int)State));
1349+
if (NextVectorToProcess != ENextVectorToProcess::None) return;
1350+
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, std::format("NEXT VECTOR: {}\n", (int)NextVectorToProcess));
13191351

13201352
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1353+
State = EState::Restoring;
13211354
ProcessBucketRestoration();
13221355
}
13231356

@@ -1366,15 +1399,24 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
13661399
return;
13671400
}
13681401
}
1369-
if (!HasRunningAsyncIoOperation() && IsFinalizing) {
13701402

1403+
if (IsFinalizingRequested) {
1404+
if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
1405+
std::cerr << "[MISHA] actually finalizing\n";
1406+
State = EState::Finalizing;
13711407
StateUi64Adapter.Finalize();
13721408
StateUi32Adapter.Finalize();
13731409
StateCharAdapter.Finalize();
13741410

1375-
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1376-
State = EState::Restoring;
1377-
}
1411+
ProcessFinalizing();
1412+
return;
1413+
}
1414+
State = EState::AcceptingData;
1415+
}
1416+
1417+
void TTableBucketSpiller::ProcessFinalizing() {
1418+
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1419+
State = EState::AcceptingDataRequests;
13781420
}
13791421
}
13801422

@@ -1452,7 +1494,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
14521494
SpilledBucketsCount--;
14531495
if (SpilledBucketsCount == 0) {
14541496
NextVectorToProcess = ENextVectorToProcess::None;
1455-
State = EState::InMemory;
1497+
State = EState::WaitingForExtraction;
14561498
} else {
14571499
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
14581500
}

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,54 @@ class TTableBucketSpiller {
119119
bool IsInMemory() const;
120120
bool IsExtractionRequired() const;
121121

122+
bool IsProcessingSpilling() const {
123+
return State == EState::Spilling;
124+
}
125+
bool IsAcceptingDataRequests() const {
126+
return State == EState::AcceptingDataRequests;
127+
}
128+
129+
bool IsRestoring() const {
130+
return State == EState::Restoring;
131+
}
132+
133+
std::string GetStateName() const {
134+
switch (State) {
135+
136+
case EState::InMemory:
137+
return "InMemory";
138+
case EState::Spilling:
139+
return "Spilling";
140+
case EState::AcceptingData:
141+
return "AcceptingData";
142+
case EState::Finalizing:
143+
return "Finalizing";
144+
case EState::AcceptingDataRequests:
145+
return "AcceptingDataRequests";
146+
case EState::Restoring:
147+
return "Restoring";
148+
case EState::WaitingForExtraction:
149+
return "WaitingForExtraction";
150+
}
151+
}
152+
122153
private:
123154
void ProcessBucketSpilling();
124155
template <class T>
125156
void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
126157
void ProcessBucketRestoration();
158+
void ProcessFinalizing();
127159

128160
private:
161+
129162
enum class EState {
163+
InMemory,
130164
Spilling,
165+
AcceptingData,
166+
Finalizing,
167+
AcceptingDataRequests,
131168
Restoring,
132-
InMemory
169+
WaitingForExtraction
133170
};
134171

135172
enum class ENextVectorToProcess {
@@ -152,11 +189,9 @@ class TTableBucketSpiller {
152189

153190
ui64 SpilledBucketsCount = 0;
154191

155-
bool IsFinalizing = false;
192+
bool IsFinalizingRequested = false;
156193

157194
TTableBucket CurrentBucket;
158-
159-
bool IsBucketOwnedBySpiller = false;
160195
};
161196

162197

@@ -285,9 +320,17 @@ class TTable {
285320
// Checks if there any async operation running. If return value is true it's safe to return Yield.
286321
bool HasRunningAsyncIoOperation() const;
287322

323+
bool IsSpillingFinished() const;
324+
325+
bool IsSpillingAcceptingDataRequests() const;
326+
327+
bool IsRestoringSpilledBuckets() const;
328+
288329
// Checks if bucket fully loaded to memory and may be joined.
289330
bool IsBucketInMemory(ui32 bucket) const;
290331

332+
bool IsSpilledBucketWaitingForExtraction(ui32 bucket) const;
333+
291334
// Starts loading spilled bucket to memory.
292335
void StartLoadingBucket(ui32 bucket);
293336

@@ -300,6 +343,18 @@ class TTable {
300343
// Clears table content
301344
void Clear();
302345

346+
void PrintSpillersState(std::string name) {
347+
348+
for (ui64 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
349+
std::string log = "[MISHA][" + name + "]";
350+
log += std::format("[{}]", bucket);
351+
log += std::format("[{}]", TableBucketsSpillers[bucket].GetStateName());
352+
353+
log += "\n";
354+
std::cerr << log;
355+
}
356+
}
357+
303358
// Creates new table with key columns and data columns
304359
TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
305360
ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,

ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <format>
34
#include <queue>
45

56
#include <ydb/library/yql/minikql/defs.h>
@@ -111,11 +112,11 @@ class TVectorSpillerAdapter {
111112
return std::move(CurrentVector);
112113
}
113114

114-
///Start restoring next vector. If th eentire contents of the vector are in memory
115-
///State will be changed to DataREady without any async read operation. ExtractVector is expected
115+
///Start restoring next vector. If the entire contents of the vector is in memory
116+
///State will be changed to DataReady without any async read operation. ExtractVector is expected
116117
///to be called immediately.
117118
void RequestNextVector() {
118-
MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
119+
MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("[MISHA ERROR], State = {}\n", (int)State));
119120
MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
120121
MKQL_ENSURE(!StoredChunksElementsCount.empty(), "Internal logic error");
121122

0 commit comments

Comments
 (0)