Skip to content

[SPARK-39858][SQL] Remove unnecessary AliasHelper or PredicateHelper for some rules #37272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

object ResolvePivot extends Rule[LogicalPlan] with AliasHelper {
object ResolvePivot extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(PIVOT), ruleId) {
case p: Pivot if !p.childrenResolved || !p.aggregates.forall(_.resolved)
Expand Down Expand Up @@ -2358,7 +2358,7 @@ class Analyzer(override val catalogManager: CatalogManager)
*
* Note: CTEs are handled in CTESubstitution.
*/
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
object ResolveSubquery extends Rule[LogicalPlan] {
/**
* Resolve the correlated expressions in a subquery, as if the expressions live in the outer
* plan. All resolved outer references are wrapped in an [[OuterReference]]
Expand Down Expand Up @@ -2531,7 +2531,7 @@ class Analyzer(override val catalogManager: CatalogManager)
* those in a HAVING clause or ORDER BY clause. These expressions are pushed down to the
* underlying aggregate operator and then projected away after the original operator.
*/
object ResolveAggregateFunctions extends Rule[LogicalPlan] with AliasHelper {
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(AGGREGATE), ruleId) {
// Resolve aggregate with having clause to Filter(..., Aggregate()). Note, to avoid wrongly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ case class Cost(card: BigInt, size: BigInt) {
*
* Filters (2) and (3) are not implemented.
*/
object JoinReorderDPFilters extends PredicateHelper {
object JoinReorderDPFilters {
/**
* Builds join graph information to be used by the filtering strategies.
* Currently, it builds the sets of star/non-star joins.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ import org.apache.spark.sql.types.DataType
* : +- ReusedSubquery Subquery scalar-subquery#242, [id=#125]
* +- *(1) Scan OneRowRelation[]
*/
object MergeScalarSubqueries extends Rule[LogicalPlan] with PredicateHelper {
object MergeScalarSubqueries extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
// Subquery reuse needs to be enabled for this optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ object LimitPushDown extends Rule[LogicalPlan] {
* safe to pushdown Filters and Projections through it. Filter pushdown is handled by another
* rule PushDownPredicates. Once we add UNION DISTINCT, we will not be able to pushdown Projections.
*/
object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper {
object PushProjectionThroughUnion extends Rule[LogicalPlan] {

/**
* Maps Attributes from the left side to the corresponding Attribute on the right side.
Expand Down Expand Up @@ -1617,7 +1617,7 @@ object PruneFilters extends Rule[LogicalPlan] with PredicateHelper {
* This rule improves performance of predicate pushdown for cascading joins such as:
* Filter-Join-Join-Join. Most predicates can be pushed down in a single pass.
*/
object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
object PushDownPredicates extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
_.containsAnyPattern(FILTER, JOIN)) {
CombineFilters.applyLocally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object ConstantFolding extends Rule[LogicalPlan] {
* - Using this mapping, replace occurrence of the attributes with the corresponding constant values
* in the AND node.
*/
object ConstantPropagation extends Rule[LogicalPlan] with PredicateHelper {
object ConstantPropagation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning(
_.containsAllPatterns(LITERAL, FILTER), ruleId) {
case f: Filter =>
Expand Down Expand Up @@ -532,7 +532,7 @@ object SimplifyBinaryComparison
/**
* Simplifies conditional expressions (if / case).
*/
object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper {
object SimplifyConditionals extends Rule[LogicalPlan] {
private def falseOrNullLiteral(e: Expression): Boolean = e match {
case FalseLiteral => true
case Literal(null, _) => true
Expand Down Expand Up @@ -617,7 +617,7 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper {
/**
* Push the foldable expression into (if / case) branches.
*/
object PushFoldableIntoBranches extends Rule[LogicalPlan] with PredicateHelper {
object PushFoldableIntoBranches extends Rule[LogicalPlan] {

// To be conservative here: it's only a guaranteed win if all but at most only one branch
// end up being not foldable.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf

trait OperationHelper extends AliasHelper with PredicateHelper {
trait OperationHelper extends PredicateHelper {
import org.apache.spark.sql.catalyst.optimizer.CollapseProject.canCollapseExpressions

type ReturnType =
Expand Down Expand Up @@ -119,7 +119,7 @@ trait OperationHelper extends AliasHelper with PredicateHelper {
* [[org.apache.spark.sql.catalyst.expressions.Alias Aliases]] are in-lined/substituted if
* necessary.
*/
object PhysicalOperation extends OperationHelper with PredicateHelper {
object PhysicalOperation extends OperationHelper {
override protected def legacyMode: Boolean = true
}

Expand All @@ -128,7 +128,7 @@ object PhysicalOperation extends OperationHelper with PredicateHelper {
* operations even if they are non-deterministic, as long as they satisfy the
* requirement of CollapseProject and CombineFilters.
*/
object ScanOperation extends OperationHelper with PredicateHelper {
object ScanOperation extends OperationHelper {
override protected def legacyMode: Boolean = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.types.BooleanType

class ExtractPredicatesWithinOutputSetSuite
extends SparkFunSuite
with PredicateHelper
with PlanTest {
class ExtractPredicatesWithinOutputSetSuite extends SparkFunSuite with PlanTest {
private val a = AttributeReference("A", BooleanType)(exprId = ExprId(1))
private val b = AttributeReference("B", BooleanType)(exprId = ExprId(2))
private val c = AttributeReference("C", BooleanType)(exprId = ExprId(3))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}

class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper {
class BinaryComparisonSimplificationSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.BooleanType

class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper with PredicateHelper {
class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._


class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {
class EliminateSubqueryAliasesSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, Timesta
import org.apache.spark.unsafe.types.CalendarInterval


class PushFoldableIntoBranchesSuite
extends PlanTest with ExpressionEvalHelper with PredicateHelper {
class PushFoldableIntoBranchesSuite extends PlanTest with ExpressionEvalHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("PushFoldableIntoBranches", FixedPoint(50),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.MetadataBuilder

class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper {
class RemoveRedundantAliasAndProjectSuite extends PlanTest {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.{BooleanType, IntegerType}


class SimplifyConditionalSuite extends PlanTest with ExpressionEvalHelper with PredicateHelper {
class SimplifyConditionalSuite extends PlanTest with ExpressionEvalHelper {

object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("SimplifyConditionals", FixedPoint(50),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
* A pattern that finds the partitioned table relation node inside the given plan, and returns a
* pair of the partition attributes and the table relation node.
*/
object PartitionedRelation extends PredicateHelper {
object PartitionedRelation {

def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = {
plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* Supports both equi-joins and non-equi-joins.
* Supports only inner like joins.
*/
object JoinSelection extends Strategy
with PredicateHelper
with JoinSelectionHelper {
object JoinSelection extends Strategy with JoinSelectionHelper {
private val hintErrorHandler = conf.hintErrorHandler

private def checkHintBuildSide(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractSingleColumnNullAwareAntiJoin}
import org.apache.spark.sql.catalyst.plans.LeftAnti
Expand All @@ -35,7 +34,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes
* stage in case of the larger join child relation finishes before the smaller relation. Note
* that this rule needs to be applied before regular join strategies.
*/
object LogicalQueryStageStrategy extends Strategy with PredicateHelper {
object LogicalQueryStageStrategy extends Strategy {

private def isBroadcastStage(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, _: BroadcastQueryStageExec) => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
* statistics will be updated. And the partition filters will be kept in the filters of returned
* logical plan.
*/
private[sql] object PruneFileSourcePartitions
extends Rule[LogicalPlan] with PredicateHelper {
private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {

private def rebuildPhysicalOperation(
projects: Seq[NamedExpression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper, SchemaPruning}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, NamedExpression, SchemaPruning}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.expressions.SortOrder
import org.apache.spark.sql.connector.expressions.filter.Predicate
Expand All @@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType

object PushDownUtils extends PredicateHelper {
object PushDownUtils {
/**
* Pushes down filters to the data source reader
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, AliasHelper, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And, Attribute, AttributeReference, AttributeSet, Cast, Expression, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.planning.ScanOperation
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources
import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType, StructType}
import org.apache.spark.sql.util.SchemaUtils._

object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper with AliasHelper {
object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
import DataSourceV2Implicits._

def apply(plan: LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
Expand All @@ -34,8 +34,7 @@ import org.apache.spark.sql.execution.joins._
* results of broadcast. For joins that are not planned as broadcast hash joins we keep
* the fallback mechanism with subquery duplicate.
*/
case class PlanDynamicPruningFilters(sparkSession: SparkSession)
extends Rule[SparkPlan] with PredicateHelper {
case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[SparkPlan] {

/**
* Identify the shape in which keys of a given plan are broadcasted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object ExtractGroupingPythonUDFFromAggregate extends Rule[LogicalPlan] {
* This has the limitation that the input to the Python UDF is not allowed include attributes from
* multiple child operators.
*/
object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper {
object ExtractPythonUDFs extends Rule[LogicalPlan] {

private type EvalType = Int
private type EvalTypeChecker = EvalType => Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet}
import org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
Expand All @@ -40,7 +40,7 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.Utils

class FileSourceStrategySuite extends QueryTest with SharedSparkSession with PredicateHelper {
class FileSourceStrategySuite extends QueryTest with SharedSparkSession {
import testImplicits._

protected override def sparkConf = super.sparkConf.set("spark.default.parallelism", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ package org.apache.spark.sql.sources
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.types._

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper {
class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName

// We have a very limited number of supported types at here since it is just for a
Expand Down