Skip to content

Commit 56c2149

Browse files
Merge cc8c8f6 into 08261c7
2 parents 08261c7 + cc8c8f6 commit 56c2149

File tree

3 files changed

+109
-51
lines changed

3 files changed

+109
-51
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,6 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
636636
// return !HasMemoryForProcessing();
637637
}
638638

639-
640639
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
641640
switch(mode) {
642641
case EOperatingMode::InMemory: {
@@ -816,18 +815,23 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
816815
RightPacker->TablePtr->UpdateSpilling();
817816
}
818817

819-
bool HasRunningAsyncOperation() const {
820-
return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation();
818+
819+
bool IsSpillingFinished() const {
820+
return LeftPacker->TablePtr->IsSpillingFinished() && RightPacker->TablePtr->IsSpillingFinished();
821821
}
822822

823-
bool IsProcessingFinished() {
824-
return LeftPacker->TablePtr->IsProcessingFinished() || RightPacker->TablePtr->IsProcessingFinished();
823+
bool IsReadyForSpilledDataProcessing() const {
824+
return LeftPacker->TablePtr->IsSpillingAcceptingDataRequests() && RightPacker->TablePtr->IsSpillingAcceptingDataRequests();
825+
}
826+
827+
bool IsRestoringSpilledBuckets() const {
828+
return LeftPacker->TablePtr->IsRestoringSpilledBuckets() || RightPacker->TablePtr->IsRestoringSpilledBuckets();
825829
}
826830

827831
void DoCalculateWithSpilling(TComputationContext& ctx) {
828832
UpdateSpilling();
829833

830-
if (!HasMemoryForProcessing()) {
834+
if (!HasMemoryForProcessing() && !IsSpillingFinalized) {
831835
bool isWaitingForReduce = TryToReduceMemoryAndWait();
832836
if (isWaitingForReduce) return;
833837
}
@@ -838,15 +842,16 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
838842
}
839843

840844
if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
841-
UpdateSpilling();
842-
if (HasRunningAsyncOperation() || !IsProcessingFinished()) return;
845+
if (!IsSpillingFinished()) return;
843846
if (!IsSpillingFinalized) {
844847
LeftPacker->TablePtr->FinalizeSpilling();
845848
RightPacker->TablePtr->FinalizeSpilling();
846849
IsSpillingFinalized = true;
847850

848-
if (HasRunningAsyncOperation()) return;
851+
UpdateSpilling();
849852
}
853+
if (!IsReadyForSpilledDataProcessing()) return;
854+
850855
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
851856
return;
852857
}
@@ -855,8 +860,15 @@ void DoCalculateWithSpilling(TComputationContext& ctx) {
855860
EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
856861
while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
857862
UpdateSpilling();
863+
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;
864+
865+
if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
866+
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
867+
}
858868

859-
if (HasRunningAsyncOperation()) return EFetchResult::Yield;
869+
if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
870+
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
871+
}
860872

861873
if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
862874
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
@@ -886,8 +898,6 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
886898

887899
NextBucketToJoin++;
888900
} else {
889-
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
890-
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
891901
*PartialJoinCompleted = true;
892902
LeftPacker->StartTime = std::chrono::system_clock::now();
893903
RightPacker->StartTime = std::chrono::system_clock::now();

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,7 +1147,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11471147
i32 largestBucketIndex = 0;
11481148
ui64 largestBucketSize = 0;
11491149
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1150-
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation() || !TableBucketsSpillers[bucket].IsProcessingFinished()) return true;
1150+
if (TableBucketsSpillers[bucket].IsProcessingSpilling()) return true;
11511151

11521152
ui64 bucketSize = GetSizeOfBucket(bucket);
11531153
if (bucketSize > largestBucketSize) {
@@ -1169,39 +1169,50 @@ void TTable::UpdateSpilling() {
11691169
}
11701170
}
11711171

1172-
void TTable::FinalizeSpilling() {
1173-
MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error");
1172+
bool TTable::IsSpillingFinished() const {
1173+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1174+
if (TableBucketsSpillers[i].IsProcessingSpilling()) return false;
1175+
}
1176+
return true;
1177+
}
11741178

1175-
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1176-
if (!TableBucketsSpillers[bucket].IsInMemory()) {
1177-
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
1178-
TableBuckets[bucket] = TTableBucket{};
1179-
TableBucketsSpillers[bucket].Finalize();
1180-
}
1179+
bool TTable::IsSpillingAcceptingDataRequests() const {
1180+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1181+
if (TableBucketsSpillers[i].IsInMemory()) continue;
1182+
1183+
if (!TableBucketsSpillers[i].IsAcceptingDataRequests()) return false;
11811184
}
1185+
return true;
11821186
}
11831187

1184-
bool TTable::HasRunningAsyncIoOperation() const {
1185-
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1186-
if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true;
1188+
bool TTable::IsRestoringSpilledBuckets() const {
1189+
for (ui64 i = 0; i < NumberOfBuckets; ++i) {
1190+
if (TableBucketsSpillers[i].IsRestoring()) return true;
11871191
}
11881192
return false;
11891193
}
11901194

1191-
bool TTable::IsProcessingFinished() const {
1195+
void TTable::FinalizeSpilling() {
11921196
for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
1193-
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return false;
1197+
if (!TableBucketsSpillers[bucket].IsInMemory()) {
1198+
TableBucketsSpillers[bucket].Finalize();
1199+
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
1200+
TableBuckets[bucket] = TTableBucket{};
1201+
1202+
}
11941203
}
1195-
return true;
11961204
}
11971205

11981206
bool TTable::IsBucketInMemory(ui32 bucket) const {
11991207
return TableBucketsSpillers[bucket].IsInMemory();
12001208
}
12011209

1210+
bool TTable::IsSpilledBucketWaitingForExtraction(ui32 bucket) const {
1211+
return TableBucketsSpillers[bucket].IsExtractionRequired();
1212+
}
1213+
12021214
void TTable::StartLoadingBucket(ui32 bucket) {
12031215
MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
1204-
if (!TableBucketsSpillers[bucket].IsProcessingFinished()) return;
12051216

12061217
TableBucketsSpillers[bucket].StartBucketRestoration();
12071218
}
@@ -1280,19 +1291,20 @@ void TTableBucketSpiller::Update() {
12801291

12811292
if (State == EState::Spilling) {
12821293
ProcessBucketSpilling();
1294+
} else if (State == EState::Finalizing) {
1295+
ProcessFinalizing();
12831296
} else if (State == EState::Restoring) {
12841297
ProcessBucketRestoration();
12851298
}
12861299
}
12871300

12881301
void TTableBucketSpiller::Finalize() {
1289-
IsFinalizing = true;
1302+
IsFinalizingRequested = true;
12901303
}
12911304

12921305
void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12931306
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
12941307
State = EState::Spilling;
1295-
IsBucketOwnedBySpiller = true;
12961308

12971309
CurrentBucket = std::move(bucket);
12981310
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
@@ -1301,9 +1313,9 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
13011313
}
13021314

13031315
TTableBucket&& TTableBucketSpiller::ExtractBucket() {
1304-
MKQL_ENSURE(State == EState::InMemory, "Internal logic error");
1316+
MKQL_ENSURE(State == EState::WaitingForExtraction, "Internal logic error");
13051317
MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
1306-
IsBucketOwnedBySpiller = false;
1318+
State = EState::InMemory;
13071319
return std::move(CurrentBucket);
13081320
}
13091321

@@ -1318,14 +1330,27 @@ bool TTableBucketSpiller::IsInMemory() const {
13181330
}
13191331

13201332
bool TTableBucketSpiller::IsExtractionRequired() const {
1321-
return IsBucketOwnedBySpiller;
1333+
return State == EState::WaitingForExtraction;
1334+
}
1335+
1336+
bool TTableBucketSpiller::IsProcessingSpilling() const {
1337+
return State == EState::Spilling;
1338+
}
1339+
1340+
bool TTableBucketSpiller::IsAcceptingDataRequests() const {
1341+
return State == EState::AcceptingDataRequests;
1342+
}
1343+
1344+
bool TTableBucketSpiller::IsRestoring() const {
1345+
return State == EState::Restoring;
13221346
}
13231347

13241348
void TTableBucketSpiller::StartBucketRestoration() {
1325-
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
1349+
MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
13261350
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
13271351

13281352
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
1353+
State = EState::Restoring;
13291354
ProcessBucketRestoration();
13301355
}
13311356

@@ -1374,15 +1399,23 @@ void TTableBucketSpiller::ProcessBucketSpilling() {
13741399
return;
13751400
}
13761401
}
1377-
if (!HasRunningAsyncIoOperation() && IsFinalizing) {
13781402

1403+
if (IsFinalizingRequested) {
1404+
if (!StateCharAdapter.IsAcceptingData() || !StateUi32Adapter.IsAcceptingData() || !StateUi64Adapter.IsAcceptingData()) return;
1405+
State = EState::Finalizing;
13791406
StateUi64Adapter.Finalize();
13801407
StateUi32Adapter.Finalize();
13811408
StateCharAdapter.Finalize();
13821409

1383-
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1384-
State = EState::Restoring;
1385-
}
1410+
ProcessFinalizing();
1411+
return;
1412+
}
1413+
State = EState::AcceptingData;
1414+
}
1415+
1416+
void TTableBucketSpiller::ProcessFinalizing() {
1417+
if (StateCharAdapter.IsAcceptingDataRequests() && StateUi32Adapter.IsAcceptingDataRequests() && StateUi64Adapter.IsAcceptingDataRequests()) {
1418+
State = EState::AcceptingDataRequests;
13861419
}
13871420
}
13881421

@@ -1460,7 +1493,7 @@ void TTableBucketSpiller::ProcessBucketRestoration() {
14601493
SpilledBucketsCount--;
14611494
if (SpilledBucketsCount == 0) {
14621495
NextVectorToProcess = ENextVectorToProcess::None;
1463-
State = EState::InMemory;
1496+
State = EState::WaitingForExtraction;
14641497
} else {
14651498
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
14661499
}

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,25 +115,35 @@ class TTableBucketSpiller {
115115
void Finalize();
116116
// Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
117117
bool HasRunningAsyncIoOperation() const;
118-
118+
// Is bucket in memory. False if spilled.
119119
bool IsInMemory() const;
120+
// Is bucket loaded to memory but still owned by spilled.
121+
// ExtractBucket must be called if true.
120122
bool IsExtractionRequired() const;
121-
122-
bool IsProcessingFinished() const {
123-
return NextVectorToProcess == ENextVectorToProcess::None;
124-
}
123+
// Is there any bucket that is being spilled right now.
124+
bool IsProcessingSpilling() const;
125+
// Is spiller ready to start loading new bucket.
126+
bool IsAcceptingDataRequests() const;
127+
// Is there any bucket that is being restored right now.
128+
bool IsRestoring() const;
125129

126130
private:
127131
void ProcessBucketSpilling();
128132
template <class T>
129133
void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
130134
void ProcessBucketRestoration();
135+
void ProcessFinalizing();
131136

132137
private:
138+
133139
enum class EState {
140+
InMemory,
134141
Spilling,
142+
AcceptingData,
143+
Finalizing,
144+
AcceptingDataRequests,
135145
Restoring,
136-
InMemory
146+
WaitingForExtraction
137147
};
138148

139149
enum class ENextVectorToProcess {
@@ -156,11 +166,9 @@ class TTableBucketSpiller {
156166

157167
ui64 SpilledBucketsCount = 0;
158168

159-
bool IsFinalizing = false;
169+
bool IsFinalizingRequested = false;
160170

161171
TTableBucket CurrentBucket;
162-
163-
bool IsBucketOwnedBySpiller = false;
164172
};
165173

166174

@@ -286,14 +294,21 @@ class TTable {
286294
// Flushes all the spillers.
287295
void FinalizeSpilling();
288296

289-
// Checks if there any async operation running. If return value is true it's safe to return Yield.
290-
bool HasRunningAsyncIoOperation() const;
297+
// Checks if spilling has any running save operation
298+
bool IsSpillingFinished() const;
299+
300+
// Checks if spilling ready for requesting buckets for restoration.
301+
bool IsSpillingAcceptingDataRequests() const;
291302

292-
bool IsProcessingFinished() const;
303+
// Checks is spilling has any running load operation
304+
bool IsRestoringSpilledBuckets() const;
293305

294306
// Checks if bucket fully loaded to memory and may be joined.
295307
bool IsBucketInMemory(ui32 bucket) const;
296308

309+
// Checks if extraction of bucket is required
310+
bool IsSpilledBucketWaitingForExtraction(ui32 bucket) const;
311+
297312
// Starts loading spilled bucket to memory.
298313
void StartLoadingBucket(ui32 bucket);
299314

0 commit comments

Comments
 (0)