|
16 | 16 |
|
17 | 17 | #include <util/string/cast.h> |
18 | 18 |
|
| 19 | +#include <contrib/libs/xxhash/xxhash.h> |
| 20 | + |
19 | 21 | namespace NKikimr { |
20 | 22 | namespace NMiniKQL { |
21 | 23 |
|
@@ -485,7 +487,8 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { |
485 | 487 | } |
486 | 488 |
|
487 | 489 | auto hash = Hasher(ViewForKeyAndState.data()); |
488 | | - auto bucketId = hash % SpilledBucketCount; |
| 490 | + XXH64_hash_t hashed_hash = XXH64(&hash, sizeof(hash), 0); |
| 491 | + auto bucketId = hashed_hash % SpilledBucketCount; |
489 | 492 | auto& bucket = SpilledBuckets[bucketId]; |
490 | 493 |
|
491 | 494 | if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) { |
@@ -592,8 +595,9 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { |
592 | 595 | SplitStateSpillingBucket = -1; |
593 | 596 | } |
594 | 597 | while (const auto keyAndState = static_cast<NUdf::TUnboxedValue *>(InMemoryProcessingState.Extract())) { |
595 | | - auto hash = Hasher(keyAndState); //Hasher uses only key for hashing |
596 | | - auto bucketId = hash % SpilledBucketCount; |
| 598 | + auto hash = Hasher(keyAndState); //Hasher uses only key for hashing |
| 599 | + XXH64_hash_t hashed_hash = XXH64(&hash, sizeof(hash), 0); |
| 600 | + auto bucketId = hashed_hash % SpilledBucketCount; |
597 | 601 | auto& bucket = SpilledBuckets[bucketId]; |
598 | 602 |
|
599 | 603 | bucket.LineCount++; |
@@ -842,6 +846,12 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> { |
842 | 846 | break; |
843 | 847 | } |
844 | 848 | case EOperatingMode::ProcessSpilled: { |
| 849 | + std::sort(SpilledBuckets.begin(), SpilledBuckets.end(), [](const TSpilledBucket& lhs, const TSpilledBucket& rhs) { |
| 850 | + bool lhs_in_memory = lhs.BucketState == TSpilledBucket::EBucketState::InMemory; |
| 851 | + bool rhs_in_memory = rhs.BucketState == TSpilledBucket::EBucketState::InMemory; |
| 852 | + return lhs_in_memory > rhs_in_memory; |
| 853 | + }); |
| 854 | + |
845 | 855 | YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled"; |
846 | 856 | MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); |
847 | 857 | MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); |
|
0 commit comments