Skip to content

Commit ee0ebe9

Browse files
committed
move choosing bucket to separate function
1 parent d95780d commit ee0ebe9

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -486,9 +486,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
486486
return isNew ? ETasteResult::Init : ETasteResult::Update;
487487
}
488488

489-
auto hash = Hasher(ViewForKeyAndState.data());
490-
XXH64_hash_t hashed_hash = XXH64(&hash, sizeof(hash), 0);
491-
auto bucketId = hashed_hash % SpilledBucketCount;
489+
auto bucketId = ChooseBucket(ViewForKeyAndState.data());
492490
auto& bucket = SpilledBuckets[bucketId];
493491

494492
if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) {
@@ -532,7 +530,15 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
532530

533531
return value;
534532
}
533+
535534
private:
535+
ui64 ChooseBucket(const NUdf::TUnboxedValuePod *const key) {
536+
auto provided_hash = Hasher(key);
537+
XXH64_hash_t bucket = XXH64(&provided_hash, sizeof(provided_hash), 0) % SpilledBucketCount;
538+
YQL_ENSURE(bucket < SpilledBucketCount, "bucket index is out of bounds");
539+
return bucket;
540+
}
541+
536542
EUpdateResult FlushSpillingBuffersAndWait() {
537543
UpdateSpillingBuckets();
538544

@@ -595,9 +601,7 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
595601
SplitStateSpillingBucket = -1;
596602
}
597603
while (const auto keyAndState = static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract())) {
598-
auto hash = Hasher(keyAndState); //Hasher uses only key for hashing
599-
XXH64_hash_t hashed_hash = XXH64(&hash, sizeof(hash), 0);
600-
auto bucketId = hashed_hash % SpilledBucketCount;
604+
auto bucketId = ChooseBucket(keyAndState); // This uses only key for hashing
601605
auto& bucket = SpilledBuckets[bucketId];
602606

603607
bucket.LineCount++;
@@ -890,7 +894,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
890894
bool RecoverState; //sub mode for ProcessSpilledData
891895

892896
TAsyncReadOperation AsyncReadOperation = std::nullopt;
893-
static constexpr size_t SpilledBucketCount = 128;
897+
static constexpr size_t SpilledBucketBits = 7;
898+
static constexpr size_t SpilledBucketCount = 1 << SpilledBucketBits;
894899
std::deque<TSpilledBucket> SpilledBuckets;
895900
ui32 SpillingBucketsCount = 0;
896901
ui32 InMemoryBucketsCount = SpilledBucketCount;

0 commit comments

Comments
 (0)