Skip to content

Commit d4fadd8

Browse files
Merge 9d17636 into e830a11
2 parents e830a11 + 9d17636 commit d4fadd8

File tree

3 files changed

+40
-27
lines changed

3 files changed

+40
-27
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -868,23 +868,22 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
868868
UnpackJoinedData(output);
869869

870870
return EFetchResult::One;
871-
872871
}
873872

874873
LeftPacker->TuplesBatchPacked = 0;
875-
LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
874+
LeftPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
876875

877876
RightPacker->TuplesBatchPacked = 0;
878-
RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
877+
RightPacker->TablePtr->ClearBucket(NextBucketToJoin); // Clear content of returned bucket
879878

880879
JoinedTablePtr->Clear();
881880
JoinedTablePtr->ResetIterator();
882881
*PartialJoinCompleted = false;
883882

884883
NextBucketToJoin++;
885884
} else {
886-
LeftPacker->TablePtr->ExtractBucket(NextBucketToJoin);
887-
RightPacker->TablePtr->ExtractBucket(NextBucketToJoin);
885+
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
886+
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
888887
*PartialJoinCompleted = true;
889888
LeftPacker->StartTime = std::chrono::system_clock::now();
890889
RightPacker->StartTime = std::chrono::system_clock::now();

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,6 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
296296

297297

298298
for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) {
299-
300299
joinResults.clear();
301300
TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
302301
TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];
@@ -1107,25 +1106,28 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
11071106
}
11081107

11091108
void TTable::Clear() {
1110-
11111109
for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
1112-
TTableBucket & tb = TableBuckets[bucket];
1113-
tb.KeyIntVals.clear();
1114-
tb.DataIntVals.clear();
1115-
tb.StringsOffsets.clear();
1116-
tb.StringsValues.clear();
1117-
tb.InterfaceValues.clear();
1118-
tb.InterfaceOffsets.clear();
1119-
tb.JoinIds.clear();
1120-
tb.RightIds.clear();
1121-
1122-
TTableBucketStats & tbs = TableBucketsStats[bucket];
1123-
tbs.TuplesNum = 0;
1124-
tbs.KeyIntValsTotalSize = 0;
1125-
tbs.StringValuesTotalSize = 0;
1110+
ClearBucket(bucket);
11261111
}
11271112
}
11281113

1114+
void TTable::ClearBucket(ui64 bucket) {
1115+
TTableBucket & tb = TableBuckets[bucket];
1116+
tb.KeyIntVals.clear();
1117+
tb.DataIntVals.clear();
1118+
tb.StringsOffsets.clear();
1119+
tb.StringsValues.clear();
1120+
tb.InterfaceValues.clear();
1121+
tb.InterfaceOffsets.clear();
1122+
tb.JoinIds.clear();
1123+
tb.RightIds.clear();
1124+
1125+
TTableBucketStats & tbs = TableBucketsStats[bucket];
1126+
tbs.TuplesNum = 0;
1127+
tbs.KeyIntValsTotalSize = 0;
1128+
tbs.StringValuesTotalSize = 0;
1129+
}
1130+
11291131
void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
11301132
for (size_t i = 0; i < NumberOfBuckets; ++i) {
11311133
TableBucketsSpillers.emplace_back(spiller, 5_MB);
@@ -1154,8 +1156,7 @@ bool TTable::TryToReduceMemoryAndWait() {
11541156
}
11551157
}
11561158

1157-
if (largestBucketSize) return false;
1158-
1159+
if (!largestBucketSize) return false;
11591160
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
11601161
TableBuckets[largestBucketIndex] = TTableBucket{};
11611162

@@ -1175,8 +1176,8 @@ void TTable::FinalizeSpilling() {
11751176
if (!TableBucketsSpillers[bucket].IsInMemory()) {
11761177
TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
11771178
TableBuckets[bucket] = TTableBucket{};
1179+
TableBucketsSpillers[bucket].Finalize();
11781180
}
1179-
TableBucketsSpillers[bucket].Finalize();
11801181
}
11811182
}
11821183

@@ -1197,7 +1198,8 @@ void TTable::StartLoadingBucket(ui32 bucket) {
11971198
TableBucketsSpillers[bucket].StartBucketRestoration();
11981199
}
11991200

1200-
void TTable::ExtractBucket(ui64 bucket) {
1201+
void TTable::PrepareBucket(ui64 bucket) {
1202+
if (!TableBucketsSpillers[bucket].IsExtractionRequired()) return;
12011203
TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket());
12021204
}
12031205

@@ -1282,6 +1284,7 @@ void TTableBucketSpiller::Finalize() {
12821284
void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12831285
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
12841286
State = EState::Spilling;
1287+
IsBucketOwnedBySpiller = true;
12851288

12861289
CurrentBucket = std::move(bucket);
12871290
NextVectorToProcess = ENextVectorToProcess::KeyAndVals;
@@ -1292,6 +1295,7 @@ void TTableBucketSpiller::SpillBucket(TTableBucket&& bucket) {
12921295
TTableBucket&& TTableBucketSpiller::ExtractBucket() {
12931296
MKQL_ENSURE(State == EState::InMemory, "Internal logic error");
12941297
MKQL_ENSURE(SpilledBucketsCount == 0, "Internal logic error");
1298+
IsBucketOwnedBySpiller = false;
12951299
return std::move(CurrentBucket);
12961300
}
12971301

@@ -1305,6 +1309,10 @@ bool TTableBucketSpiller::IsInMemory() const {
13051309
return State == EState::InMemory;
13061310
}
13071311

1312+
bool TTableBucketSpiller::IsExtractionRequired() const {
1313+
return IsBucketOwnedBySpiller;
1314+
}
1315+
13081316
void TTableBucketSpiller::StartBucketRestoration() {
13091317
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
13101318
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TTableBucketSpiller {
117117
bool HasRunningAsyncIoOperation() const;
118118

119119
bool IsInMemory() const;
120+
bool IsExtractionRequired() const;
120121

121122
private:
122123
void ProcessBucketSpilling();
@@ -154,6 +155,8 @@ class TTableBucketSpiller {
154155
bool IsFinalizing = false;
155156

156157
TTableBucket CurrentBucket;
158+
159+
bool IsBucketOwnedBySpiller = false;
157160
};
158161

159162

@@ -288,8 +291,11 @@ class TTable {
288291
// Starts loading spilled bucket to memory.
289292
void StartLoadingBucket(ui32 bucket);
290293

291-
// Extracts loaded bucket from spilling.
292-
void ExtractBucket(ui64 bucket);
294+
// Prepares bucket for joining after spilling and restoring back.
295+
void PrepareBucket(ui64 bucket);
296+
297+
// Clears all the data related to a single bucket
298+
void ClearBucket(ui64 bucket);
293299

294300
// Clears table content
295301
void Clear();

0 commit comments

Comments
 (0)