@@ -66,7 +66,12 @@ HashBuild::HashBuild(
6666 joinBridge_ (operatorCtx_->task ()->getHashJoinBridgeLocked(
6767 operatorCtx_->driverCtx ()->splitGroupId,
6868 planNodeId())),
69- keyChannelMap_(joinNode_->rightKeys ().size()) {
69+ dropDuplicates_(joinNode_->canDropDuplicates ()),
70+ keyChannelMap_(joinNode_->rightKeys ().size()),
71+ abandonBuildNoDupHashMinRows_(
72+ driverCtx->queryConfig ().abandonBuildNoDupHashMinRows()),
73+ abandonBuildNoDupHashMinPct_(
74+ driverCtx->queryConfig ().abandonBuildNoDupHashMinPct()) {
7075 VELOX_CHECK (pool ()->trackUsage ());
7176 VELOX_CHECK_NOT_NULL (joinBridge_);
7277
@@ -86,19 +91,22 @@ HashBuild::HashBuild(
8691
8792 // Identify the non-key build side columns and make a decoder for each.
8893 const int32_t numDependents = inputType->size () - numKeys;
89- if (numDependents > 0 ) {
90- // Number of join keys (numKeys) may be less then number of input columns
91- // (inputType->size()). In this case numDependents is negative and cannot be
92- // used to call 'reserve'. This happens when we join different probe side
93- // keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
94- // u.k AND t.k2 = u.k.
95- dependentChannels_.reserve (numDependents);
96- decoders_.reserve (numDependents);
97- }
98- for (auto i = 0 ; i < inputType->size (); ++i) {
99- if (keyChannelMap_.find (i) == keyChannelMap_.end ()) {
100- dependentChannels_.emplace_back (i);
101- decoders_.emplace_back (std::make_unique<DecodedVector>());
94+ if (!dropDuplicates_) {
95+ if (numDependents > 0 ) {
96+ // Number of join keys (numKeys) may be less then number of input columns
97+ // (inputType->size()). In this case numDependents is negative and cannot
98+ // be used to call 'reserve'. This happens when we join different probe
99+ // side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
100+ // t.k1 = u.k AND t.k2 = u.k.
101+ dependentChannels_.reserve (numDependents);
102+ decoders_.reserve (numDependents);
103+ }
104+
105+ for (auto i = 0 ; i < inputType->size (); ++i) {
106+ if (keyChannelMap_.find (i) == keyChannelMap_.end ()) {
107+ dependentChannels_.emplace_back (i);
108+ decoders_.emplace_back (std::make_unique<DecodedVector>());
109+ }
102110 }
103111 }
104112
@@ -146,11 +154,6 @@ void HashBuild::setupTable() {
146154 .minTableRowsForParallelJoinBuild (),
147155 pool ());
148156 } else {
149- // (Left) semi and anti join with no extra filter only needs to know whether
150- // there is a match. Hence, no need to store entries with duplicate keys.
151- const bool dropDuplicates = !joinNode_->filter () &&
152- (joinNode_->isLeftSemiFilterJoin () ||
153- joinNode_->isLeftSemiProjectJoin () || isAntiJoin (joinType_));
154157 // Right semi join needs to tag build rows that were probed.
155158 const bool needProbedFlag = joinNode_->isRightSemiFilterJoin ();
156159 if (isLeftNullAwareJoinWithFilter (joinNode_)) {
@@ -159,7 +162,7 @@ void HashBuild::setupTable() {
159162 table_ = HashTable<false >::createForJoin (
160163 std::move (keyHashers),
161164 dependentTypes,
162- !dropDuplicates , // allowDuplicates
165+ !dropDuplicates_ , // allowDuplicates
163166 needProbedFlag, // hasProbedFlag
164167 operatorCtx_->driverCtx ()
165168 ->queryConfig ()
@@ -170,15 +173,22 @@ void HashBuild::setupTable() {
170173 table_ = HashTable<true >::createForJoin (
171174 std::move (keyHashers),
172175 dependentTypes,
173- !dropDuplicates , // allowDuplicates
176+ !dropDuplicates_ , // allowDuplicates
174177 needProbedFlag, // hasProbedFlag
175178 operatorCtx_->driverCtx ()
176179 ->queryConfig ()
177180 .minTableRowsForParallelJoinBuild (),
178181 pool ());
179182 }
180183 }
184+ lookup_ = std::make_unique<HashLookup>(table_->hashers (), pool ());
181185 analyzeKeys_ = table_->hashMode () != BaseHashTable::HashMode::kHash ;
186+ if (abandonBuildNoDupHashMinPct_ == 0 ) {
187+ // Building a HashTable without duplicates is disabled if
188+ // abandonBuildNoDupHashMinPct_ is 0.
189+ abandonBuildNoDupHash_ = true ;
190+ table_->joinTableMayHaveDuplicates ();
191+ }
182192}
183193
184194void HashBuild::setupSpiller (SpillPartition* spillPartition) {
@@ -377,6 +387,31 @@ void HashBuild::addInput(RowVectorPtr input) {
377387 return ;
378388 }
379389
390+ if (dropDuplicates_ && !abandonBuildNoDupHash_) {
391+ const bool abandonEarly = abandonBuildNoDupHashEarly (table_->numDistinct ());
392+ numHashInputRows_ += activeRows_.countSelected ();
393+ if (abandonEarly) {
394+ // The hash table is no longer directly constructed in addInput. The data
395+ // that was previously inserted into the hash table is already in the
396+ // RowContainer.
397+ addRuntimeStat (" abandonBuildNoDupHash" , RuntimeCounter (1 ));
398+ abandonBuildNoDupHash_ = true ;
399+ table_->joinTableMayHaveDuplicates ();
400+ } else {
401+ table_->prepareForGroupProbe (
402+ *lookup_,
403+ input,
404+ activeRows_,
405+ BaseHashTable::kNoSpillInputStartPartitionBit );
406+ if (lookup_->rows .empty ()) {
407+ return ;
408+ }
409+ table_->groupProbe (
410+ *lookup_, BaseHashTable::kNoSpillInputStartPartitionBit );
411+ return ;
412+ }
413+ }
414+
380415 if (analyzeKeys_ && hashes_.size () < activeRows_.end ()) {
381416 hashes_.resize (activeRows_.end ());
382417 }
@@ -756,7 +791,8 @@ bool HashBuild::finishHashBuild() {
756791 isInputFromSpill () ? spillConfig ()->startPartitionBit
757792 : BaseHashTable::kNoSpillInputStartPartitionBit ,
758793 allowParallelJoinBuild ? operatorCtx_->task ()->queryCtx ()->executor ()
759- : nullptr );
794+ : nullptr ,
795+ dropDuplicates_);
760796 }
761797 stats_.wlock ()->addRuntimeStat (
762798 BaseHashTable::kBuildWallNanos ,
@@ -879,6 +915,7 @@ void HashBuild::setupSpillInput(HashJoinBridge::SpillInput spillInput) {
879915 setupTable ();
880916 setupSpiller (spillInput.spillPartition .get ());
881917 stateCleared_ = false ;
918+ numHashInputRows_ = 0 ;
882919
883920 // Start to process spill input.
884921 processSpillInput ();
@@ -1240,4 +1277,10 @@ void HashBuildSpiller::extractSpill(
12401277 rows.data (), rows.size (), false , false , result->childAt (types.size ()));
12411278 }
12421279}
1280+
1281+ bool HashBuild::abandonBuildNoDupHashEarly (int64_t numDistinct) const {
1282+ VELOX_CHECK (dropDuplicates_);
1283+ return numHashInputRows_ > abandonBuildNoDupHashMinRows_ &&
1284+ numDistinct / numHashInputRows_ >= abandonBuildNoDupHashMinPct_ / 100 ;
1285+ }
12431286} // namespace facebook::velox::exec
0 commit comments