Skip to content

Commit 70713cc

Browse files
Yql-18391 gracejoin refactoring for spilling (#4559)
1 parent ddd1720 commit 70713cc

File tree

2 files changed

+50
-46
lines changed

2 files changed

+50
-46
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp

Lines changed: 42 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
#include <ydb/library/yql/utils/log/log.h>
55

66
#include <contrib/libs/xxhash/xxhash.h>
7-
#include <chrono>
87
#include <string_view>
9-
#include <format>
108

119

1210
namespace NKikimr {
@@ -65,52 +63,43 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
6563
std::memcpy(currStrPtr, IColumnsVals[i].data(), IColumnsVals[i].size() );
6664
currStrPtr+=IColumnsVals[i].size();
6765
}
68-
69-
7066
}
7167

72-
7368
TempTuple[0] &= ui64(0x1); // Setting only nulls in key bit, all other bits are ignored for key hash
7469
for (ui32 i = 1; i < NullsBitmapSize_; i ++) {
7570
TempTuple[i] = 0;
7671
}
7772

78-
7973
XXH64_hash_t hash = XXH64(TempTuple.data() + NullsBitmapSize_, (TempTuple.size() - NullsBitmapSize_) * sizeof(ui64), 0);
8074

8175
if (!hash) hash = 1;
8276

8377
ui64 bucket = hash & BucketsMask;
8478

85-
86-
8779
std::vector<ui64, TMKQLAllocator<ui64>> & keyIntVals = TableBuckets[bucket].KeyIntVals;
8880
std::vector<ui32, TMKQLAllocator<ui32>> & stringsOffsets = TableBuckets[bucket].StringsOffsets;
8981
std::vector<ui64, TMKQLAllocator<ui64>> & dataIntVals = TableBuckets[bucket].DataIntVals;
9082
std::vector<char, TMKQLAllocator<char>> & stringVals = TableBuckets[bucket].StringsValues;
9183
KeysHashTable & kh = TableBuckets[bucket].AnyHashTable;
9284

93-
ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector
85+
ui32 offset = keyIntVals.size(); // Offset of tuple inside the keyIntVals vector
9486

9587
keyIntVals.push_back(hash);
9688
keyIntVals.insert(keyIntVals.end(), intColumns, intColumns + NullsBitmapSize_);
9789
keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize_, TempTuple.end());
9890

99-
100-
10191
if (IsAny_) {
10292
if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset) ) {
10393
keyIntVals.resize(offset);
10494
return;
10595
}
10696
}
10797

108-
109-
TableBuckets[bucket].TuplesNum++;
98+
TableBucketsStats[bucket].TuplesNum++;
11099

111100
if (NumberOfStringColumns || NumberOfIColumns ) {
112-
stringsOffsets.push_back(offset); // Adding offset to tuple in keyIntVals vector
113-
stringsOffsets.push_back(stringVals.size()); // Adding offset to string values
101+
stringsOffsets.push_back(TableBucketsStats[bucket].KeyIntValsTotalSize); // Adding offset to tuple in keyIntVals vector
102+
stringsOffsets.push_back(TableBucketsStats[bucket].StringValuesTotalSize); // Adding offset to string values
114103

115104

116105
// Adding strings sizes for keys and data
@@ -128,10 +117,8 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
128117
stringsOffsets.push_back(IColumnsVals[i].size());
129118
}
130119
}
131-
132120
}
133121

134-
135122
// Adding data values
136123
ui64 * dataColumns = intColumns + NullsBitmapSize_ + NumberOfKeyIntColumns;
137124
dataIntVals.insert(dataIntVals.end(), dataColumns, dataColumns + NumberOfDataIntColumns);
@@ -140,17 +127,18 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings
140127
char ** dataStringsColumns = stringColumns + NumberOfKeyStringColumns;
141128
ui32 * dataStringsSizes = stringsSizes + NumberOfKeyStringColumns;
142129

130+
ui64 initialStringsSize = stringVals.size();
143131
for( ui64 i = 0; i < NumberOfDataStringColumns; i++) {
144132
ui32 currStringSize = *(dataStringsSizes + i);
145133
stringVals.insert(stringVals.end(), *(dataStringsColumns + i), *(dataStringsColumns + i) + currStringSize);
146134
}
147135

148136
for ( ui64 i = 0; i < NumberOfDataIColumns; i++) {
149137
stringVals.insert( stringVals.end(), IColumnsVals[NumberOfKeyIColumns + i].begin(), IColumnsVals[NumberOfKeyIColumns + i].end());
150-
151138
}
152139

153-
140+
TableBucketsStats[bucket].KeyIntValsTotalSize += keyIntVals.size() - offset;
141+
TableBucketsStats[bucket].StringValuesTotalSize += stringVals.size() - initialStringsSize;
154142
}
155143

156144
void TTable::ResetIterator() {
@@ -166,21 +154,21 @@ void TTable::ResetIterator() {
166154
}
167155

168156
// Checks if there are more tuples and sets bucketId and tupleId to next valid.
169-
inline bool HasMoreTuples(std::vector<TTableBucket> & tableBuckets, ui64 & bucketId, ui64 & tupleId ) {
157+
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId ) {
170158

171-
if (bucketId >= tableBuckets.size()) return false;
159+
if (bucketId >= tableBucketsStats.size()) return false;
172160

173-
if ( tupleId >= tableBuckets[bucketId].TuplesNum ) {
161+
if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
174162
tupleId = 0;
175163
bucketId ++;
176164

177-
if (bucketId == tableBuckets.size()) {
165+
if (bucketId == tableBucketsStats.size()) {
178166
return false;
179167
}
180168

181-
while( tableBuckets[bucketId].TuplesNum == 0 ) {
169+
while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
182170
bucketId ++;
183-
if (bucketId == tableBuckets.size()) {
171+
if (bucketId == tableBucketsStats.size()) {
184172
return false;
185173
}
186174
}
@@ -193,7 +181,7 @@ inline bool HasMoreTuples(std::vector<TTableBucket> & tableBuckets, ui64 & bucke
193181

194182
// Returns value of next tuple. Returs true if there are more tuples
195183
bool TTable::NextTuple(TupleData & td){
196-
if (HasMoreTuples(TableBuckets, CurrIterBucket, CurrIterIndex )) {
184+
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex )) {
197185
GetTupleData(CurrIterBucket, CurrIterIndex, td);
198186
CurrIterIndex++;
199187
return true;
@@ -316,6 +304,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
316304
TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
317305
TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];
318306

307+
ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum;
308+
ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum;
309+
319310
ui64 headerSize1 = JoinTable1->HeaderSize;
320311
ui64 headerSize2 = JoinTable2->HeaderSize;
321312
ui64 nullsSize1 = JoinTable1->NullsBitmapSize_;
@@ -328,27 +319,28 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
328319
bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
329320

330321

331-
if ( bucket2->TuplesNum > bucket1->TuplesNum ) {
322+
if (tuplesNum2 > tuplesNum1) {
332323
std::swap(bucket1, bucket2);
333324
std::swap(headerSize1, headerSize2);
334325
std::swap(nullsSize1, nullsSize2);
335326
std::swap(keyIntOffset1, keyIntOffset2);
336327
std::swap(table1HasKeyStringColumns, table2HasKeyStringColumns);
337328
std::swap(table1HasKeyIColumns, table2HasKeyIColumns);
329+
std::swap(tuplesNum1, tuplesNum2);
338330
}
339331

340-
joinResults.reserve(3 * bucket1->TuplesNum );
332+
joinResults.reserve(3 * tuplesNum1 );
341333

342334
ui64 slotSize = headerSize2;
343335

344-
ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - bucket2->TuplesNum * headerSize2) ) / ( 2 * bucket2->TuplesNum + 1) + 1;
336+
ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1;
345337

346338
if (table2HasKeyStringColumns || table2HasKeyIColumns ) {
347339
slotSize = slotSize + avgStringsSize;
348340
}
349341

350342

351-
ui64 nSlots = 3 * bucket2->TuplesNum + 1;
343+
ui64 nSlots = 3 * tuplesNum2 + 1;
352344
joinSlots.clear();
353345
spillSlots.clear();
354346
slotToIdx.clear();
@@ -501,7 +493,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
501493
JoinTuplesIds joinIds;
502494
joinIds.id1 = tuple1Idx;
503495
joinIds.id2 = slotToIdx[(slotIt - joinSlots.begin()) / slotSize];
504-
if (JoinTable2->TableBuckets[bucket].TuplesNum > JoinTable1->TableBuckets[bucket].TuplesNum)
496+
if (JoinTable2->TableBucketsStats[bucket].TuplesNum > JoinTable1->TableBucketsStats[bucket].TuplesNum)
505497
{
506498
std::swap(joinIds.id1, joinIds.id2);
507499
}
@@ -783,11 +775,11 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
783775

784776
if (JoinKind == EJoinKind::Cross) {
785777

786-
if (HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex))
778+
if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex))
787779
{
788780
JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1);
789781

790-
if (HasMoreTuples(JoinTable2->TableBuckets, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex))
782+
if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex))
791783
{
792784
JoinTable2->GetTupleData(JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, td2);
793785
JoinTable2->CurrIterIndex++;
@@ -806,7 +798,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
806798
}
807799

808800
if ( JoinKind == EJoinKind::Inner ) {
809-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
801+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
810802
ui32 tupleId2;
811803
if (HasJoinedTupleId(JoinTable1, tupleId2))
812804
{
@@ -822,7 +814,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
822814
}
823815

824816
if ( JoinKind == EJoinKind::Left ) {
825-
while (HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
817+
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
826818
ui32 tupleId2;
827819
if (HasJoinedTupleId(JoinTable1, tupleId2))
828820
{
@@ -857,7 +849,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
857849
}
858850

859851
if ( JoinKind == EJoinKind::Right ) {
860-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
852+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
861853
ui32 tupleId2;
862854
if (HasJoinedTupleId(JoinTable1, tupleId2))
863855
{
@@ -898,7 +890,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
898890
if ( RightTableBatch_ && HasMoreRightTuples_ )
899891
return false;
900892

901-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
893+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
902894
ui32 tupleId2;
903895

904896
bool globalMatchedId = false;
@@ -929,7 +921,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
929921
if (LeftTableBatch_ && HasMoreLeftTuples_ )
930922
return false;
931923

932-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
924+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
933925
ui32 tupleId2;
934926

935927
bool globalMatchedId = false;
@@ -961,7 +953,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
961953
if (RightTableBatch_ && HasMoreRightTuples_ )
962954
return false;
963955

964-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
956+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
965957
ui32 tupleId2;
966958

967959
if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
@@ -995,7 +987,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
995987
if (LeftTableBatch_ && HasMoreLeftTuples_ )
996988
return false;
997989

998-
while(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
990+
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
999991
ui32 tupleId2;
1000992
if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
1001993
{
@@ -1022,7 +1014,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10221014
}
10231015

10241016
if ( JoinKind == EJoinKind::Full ) {
1025-
if(HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
1017+
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
10261018
ui32 tupleId2;
10271019
if (HasJoinedTupleId(JoinTable1, tupleId2))
10281020
{
@@ -1048,7 +1040,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10481040
Table2Initialized_ = true;
10491041
}
10501042

1051-
while (HasMoreTuples(JoinTable2->TableBuckets, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
1043+
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
10521044

10531045
if (CurrIterBucket != JoinTable2->CurrIterBucket) {
10541046
CurrIterBucket = JoinTable2->CurrIterBucket;
@@ -1072,7 +1064,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10721064
}
10731065

10741066
if ( JoinKind == EJoinKind::Exclusion ) {
1075-
while (HasMoreTuples(JoinTable1->TableBuckets, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
1067+
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
10761068
ui32 tupleId2;
10771069
if (HasJoinedTupleId(JoinTable1, tupleId2))
10781070
{
@@ -1090,7 +1082,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
10901082

10911083
td1.AllNulls = true;
10921084

1093-
while (HasMoreTuples(JoinTable2->TableBuckets, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
1085+
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
10941086

10951087
if (CurrIterBucket != JoinTable2->CurrIterBucket) {
10961088
CurrIterBucket = JoinTable2->CurrIterBucket;
@@ -1121,7 +1113,6 @@ void TTable::Clear() {
11211113

11221114
for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
11231115
TTableBucket & tb = TableBuckets[bucket];
1124-
tb.TuplesNum = 0;
11251116
tb.KeyIntVals.clear();
11261117
tb.DataIntVals.clear();
11271118
tb.StringsOffsets.clear();
@@ -1130,6 +1121,11 @@ void TTable::Clear() {
11301121
tb.InterfaceOffsets.clear();
11311122
tb.JoinIds.clear();
11321123
tb.RightIds.clear();
1124+
1125+
TTableBucketStats & tbs = TableBucketsStats[bucket];
1126+
tbs.TuplesNum = 0;
1127+
tbs.KeyIntValsTotalSize = 0;
1128+
tbs.StringValuesTotalSize = 0;
11331129
}
11341130

11351131

@@ -1165,6 +1161,7 @@ TTable::TTable( ui64 numberOfKeyIntColumns, ui64 numberOfKeyStringColumns,
11651161
HeaderSize = HashSize + NullsBitmapSize_ + NumberOfKeyIntColumns + NumberOfKeyIColumns + TotalStringsSize;
11661162

11671163
TableBuckets.resize(NumberOfBuckets);
1164+
TableBucketsStats.resize(NumberOfBuckets);
11681165

11691166
const ui64 reservedSizePerTuple = (2 * DefaultTupleBytes) / sizeof(ui64);
11701167

ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ struct KeysHashTable {
5353
};
5454

5555
struct TTableBucket {
56-
ui64 TuplesNum = 0; // Total number of tuples in bucket
5756
std::vector<ui64, TMKQLAllocator<ui64>> KeyIntVals; // Vector to store table key values
5857
std::vector<ui64, TMKQLAllocator<ui64>> DataIntVals; // Vector to store data values in bucket
5958
std::vector<char, TMKQLAllocator<char>> StringsValues; // Vector to store data strings values
@@ -71,6 +70,12 @@ struct TTableBucket {
7170

7271
};
7372

73+
struct TTableBucketStats {
74+
ui64 TuplesNum = 0; // Total number of tuples in bucket
75+
ui64 StringValuesTotalSize = 0; // Total size of StringsValues. Used to correctly calculate StringsOffsets.
76+
ui64 KeyIntValsTotalSize = 0; // Total size of KeyIntVals. Used to correctly calculate StringsOffsets.
77+
};
78+
7479
struct TupleData {
7580
ui64 * IntColumns = nullptr; // Array of packed int data of the table. Caller should allocate array of NumberOfIntColumns size
7681
char ** StrColumns = nullptr; // Pointers to values of strings for table. Strings are not null-terminated
@@ -118,6 +123,8 @@ class TTable {
118123

119124
// Table data is partitioned in buckets based on key value
120125
std::vector<TTableBucket> TableBuckets;
126+
// Statistics for buckets. Total number of tuples inside a single bucket and offsets.
127+
std::vector<TTableBucketStats> TableBucketsStats;
121128

122129
// Temporary vector for tuples manipulation;
123130
std::vector<ui64> TempTuple;

0 commit comments

Comments
 (0)