Skip to content

[SPARK-14796][SQL] Add spark.sql.optimizer.inSetConversionThreshold config option. #12562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ trait CatalystConf {
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int

/**
Expand All @@ -47,6 +48,7 @@ case class SimpleCatalystConf(
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
OptimizeIn,
OptimizeIn(conf),
ConstantFolding,
LikeSimplification,
BooleanSimplification,
Expand Down Expand Up @@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] {
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
* which is much faster
*/
object OptimizeIn extends Rule[LogicalPlan] {
case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 =>
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
list.size > conf.optimizerInSetConversionThreshold =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest {
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("ConstantFolding", Once,
OptimizeIn,
OptimizeIn(SimpleCatalystConf(true)),
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._
Expand All @@ -36,7 +37,7 @@ class OptimizeInSuite extends PlanTest {
NullPropagation,
ConstantFolding,
BooleanSimplification,
OptimizeIn) :: Nil
OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down Expand Up @@ -128,4 +129,23 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: Setting the threshold for turning Set into InSet.") {
val plan =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
.analyze

val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan)
comparePlans(notOptimizedPlan, plan)

// Reduce the threshold to turning into InSet.
val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true,
optimizerInSetConversionThreshold = 2))(plan)
optimizedPlan match {
case Filter(cond, _)
if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
// pass
case _ => fail("Unexpected result for OptimizedIn")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ object SQLConf {

val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs")
.doc("The max number of iterations the optimizer and analyzer runs.")
.intConf
.createWithDefault(100)

val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold")
.internal()
.doc("The threshold of set size for InSet conversion.")
.intConf
.createWithDefault(10)

val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
Expand Down Expand Up @@ -529,6 +536,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
Expand Down