From 0f6d7643aea0a82ebb2a7179b9422edb2c3d13d8 Mon Sep 17 00:00:00 2001 From: huangxiaoping <1754789345@qq.com> Date: Tue, 23 Jul 2024 12:14:55 +0800 Subject: [PATCH] [KYUUBI #6554] Delete redundant code related to zorder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6554 ## Describe Your Solution ๐Ÿ”ง - Delete `/kyuubi/extensions/spark/kyuubi-extension-spark-3-x/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala` file - Rename `InsertZorderBeforeWriting33.scala` to `InsertZorderBeforeWriting.scala` - Rename `InsertZorderHelper33, InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ZorderSuiteSpark33` to `InsertZorderHelper, InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ZorderSuiteSpark` ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6555 from huangxiaopingRD/6554. Closes #6554 26de4fa09 [huangxiaoping] [KYUUBI #6554] Delete redundant code related to zorder Authored-by: huangxiaoping <1754789345@qq.com> Signed-off-by: Cheng Pan --- .../sql/KyuubiSparkSQLCommonExtension.scala | 6 +- ....scala => InsertZorderBeforeWriting.scala} | 14 +- .../InsertZorderBeforeWritingBase.scala | 188 ------------------ .../org/apache/spark/sql/ZorderSuite.scala | 6 +- .../sql/KyuubiSparkSQLCommonExtension.scala | 6 +- .../zorder/InsertZorderBeforeWriting.scala | 14 +- .../InsertZorderBeforeWritingBase.scala | 155 --------------- .../sql/KyuubiSparkSQLCommonExtension.scala | 6 +- .../zorder/InsertZorderBeforeWriting.scala | 14 +- .../InsertZorderBeforeWritingBase.scala | 155 --------------- 10 files changed, 39 insertions(+), 525 deletions(-) rename extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/{InsertZorderBeforeWriting33.scala => InsertZorderBeforeWriting.scala} (96%) delete mode 100644 extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala delete mode 100644 extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala delete mode 100644 extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala index c001ffc6c3b..3dda669a8a3 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder} +import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder} class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { @@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension { // should be applied before // RepartitionBeforeWriting and RebalanceBeforeWriting // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive) extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala similarity index 96% rename from extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala rename to extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 4ae2057dce6..4b2494bc84f 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting33.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -28,7 +28,11 @@ import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, Inse import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} -trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { +trait ZorderBuilder { + def buildZorder(children: Seq[Expression]): ZorderBase +} + +trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" @@ -140,8 +144,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { } } -case class InsertZorderBeforeWritingDatasource33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingDatasource(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHadoopFsRelationCommand if insert.query.resolved && @@ -172,8 +176,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession) } } -case class InsertZorderBeforeWritingHive33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingHive(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHiveTable if insert.query.resolved && diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala deleted file mode 100644 index a99b6cca73d..00000000000 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.sql.zorder - -import java.util.Locale - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Repartition, RepartitionByExpression, Sort} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand -import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable, OptimizedCreateHiveTableAsSelectCommand} - -import org.apache.kyuubi.sql.KyuubiSQLConf - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingDatasourceBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHadoopFsRelationCommand - if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined && - isZorderEnabled(insert.catalogTable.get.properties) => - val newQuery = insertZorder(insert.catalogTable.get, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - - case ctas: CreateDataSourceTableAsSelectCommand - if ctas.query.resolved && ctas.table.bucketSpec.isEmpty && - isZorderEnabled(ctas.table.properties) => - val newQuery = insertZorder(ctas.table, ctas.query) - if (newQuery.eq(ctas.query)) { - ctas - } else { - ctas.copy(query = newQuery) - } - - case _ => plan - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingHiveBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHiveTable - if insert.query.resolved && insert.table.bucketSpec.isEmpty && - isZorderEnabled(insert.table.properties) => - val newQuery = insertZorder(insert.table, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - - case ctas: CreateHiveTableAsSelectCommand - if ctas.query.resolved && ctas.tableDesc.bucketSpec.isEmpty && - isZorderEnabled(ctas.tableDesc.properties) => - val newQuery = insertZorder(ctas.tableDesc, ctas.query) - if (newQuery.eq(ctas.query)) { - ctas - } else { - ctas.copy(query = newQuery) - } - - case octas: OptimizedCreateHiveTableAsSelectCommand - if octas.query.resolved && octas.tableDesc.bucketSpec.isEmpty && - isZorderEnabled(octas.tableDesc.properties) => - val newQuery = insertZorder(octas.tableDesc, octas.query) - if (newQuery.eq(octas.query)) { - octas - } else { - octas.copy(query = newQuery) - } - - case _ => plan - } -} - -trait ZorderBuilder { - def buildZorder(children: Seq[Expression]): ZorderBase -} - -trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { - private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" - private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" - - def isZorderEnabled(props: Map[String, String]): Boolean = { - props.contains(KYUUBI_ZORDER_ENABLED) && - "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) && - props.contains(KYUUBI_ZORDER_COLS) - } - - def getZorderColumns(props: Map[String, String]): Seq[String] = { - val cols = props.get(KYUUBI_ZORDER_COLS) - assert(cols.isDefined) - cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT)) - } - - def canInsertZorder(query: LogicalPlan): Boolean = query match { - case Project(_, child) => canInsertZorder(child) - // TODO: actually, we can force zorder even if existed some shuffle - case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false - case _ => true - } - - def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = { - if (!canInsertZorder(plan)) { - return plan - } - val cols = getZorderColumns(catalogTable.properties) - val attrs = plan.output.map(attr => (attr.name, attr)).toMap - if (cols.exists(!attrs.contains(_))) { - logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " + - s"please check your table properties ${KYUUBI_ZORDER_COLS}.") - plan - } else { - val bound = cols.map(attrs(_)) - val orderExpr = - if (bound.length == 1) { - bound.head - } else { - buildZorder(bound) - } - // TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3 - // see https://github.com/apache/spark/pull/34542 - Sort( - SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil, - conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED), - plan) - } - } - - def applyInternal(plan: LogicalPlan): LogicalPlan - - final override def apply(plan: LogicalPlan): LogicalPlan = { - if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) { - applyInternal(plan) - } else { - plan - } - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingDatasource(session: SparkSession) - extends InsertZorderBeforeWritingDatasourceBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingHive(session: SparkSession) - extends InsertZorderBeforeWritingHiveBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -} diff --git a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala index a08366f1d4a..d18e3035945 100644 --- a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala +++ b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/ZorderSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.kyuubi.sql.{KyuubiSQLConf, SparkKyuubiSparkSQLParser} import org.apache.kyuubi.sql.zorder.Zorder -trait ZorderSuiteSpark33 extends ZorderSuiteBase { +trait ZorderSuiteSpark extends ZorderSuiteBase { test("Add rebalance before zorder") { Seq("true" -> false, "false" -> true).foreach { case (useOriginalOrdering, zorder) => @@ -115,10 +115,10 @@ trait ParserSuite { self: ZorderSuiteBase => class ZorderWithCodegenEnabledSuite extends ZorderWithCodegenEnabledSuiteBase - with ZorderSuiteSpark33 + with ZorderSuiteSpark with ParserSuite {} class ZorderWithCodegenDisabledSuite extends ZorderWithCodegenDisabledSuiteBase - with ZorderSuiteSpark33 + with ZorderSuiteSpark with ParserSuite {} diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala index f39ad3cc390..c4ddcef2b13 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder} +import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder} class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { @@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension { // should be applied before // RepartitionBeforeWriting and RebalanceBeforeWriting // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive) extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 73ed5e253bb..003ba6b68a6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -27,7 +27,11 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} -trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { +trait ZorderBuilder { + def buildZorder(children: Seq[Expression]): ZorderBase +} + +trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" @@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { } } -case class InsertZorderBeforeWritingDatasource33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingDatasource(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHadoopFsRelationCommand if insert.query.resolved && @@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession) } } -case class InsertZorderBeforeWritingHive33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingHive(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHiveTable if insert.query.resolved && diff --git a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala deleted file mode 100644 index 2c59d148e98..00000000000 --- a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.sql.zorder - -import java.util.Locale - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand -import org.apache.spark.sql.hive.execution.InsertIntoHiveTable - -import org.apache.kyuubi.sql.KyuubiSQLConf - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingDatasourceBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHadoopFsRelationCommand - if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined && - isZorderEnabled(insert.catalogTable.get.properties) => - val newQuery = insertZorder(insert.catalogTable.get, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - case _ => plan - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingHiveBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHiveTable - if insert.query.resolved && insert.table.bucketSpec.isEmpty && - isZorderEnabled(insert.table.properties) => - val newQuery = insertZorder(insert.table, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - case _ => plan - } -} - -trait ZorderBuilder { - def buildZorder(children: Seq[Expression]): ZorderBase -} - -trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { - private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" - private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" - - def isZorderEnabled(props: Map[String, String]): Boolean = { - props.contains(KYUUBI_ZORDER_ENABLED) && - "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) && - props.contains(KYUUBI_ZORDER_COLS) - } - - def getZorderColumns(props: Map[String, String]): Seq[String] = { - val cols = props.get(KYUUBI_ZORDER_COLS) - assert(cols.isDefined) - cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT)) - } - - def canInsertZorder(query: LogicalPlan): Boolean = query match { - case Project(_, child) => canInsertZorder(child) - // TODO: actually, we can force zorder even if existed some shuffle - case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false - case _ => true - } - - def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = { - if (!canInsertZorder(plan)) { - return plan - } - val cols = getZorderColumns(catalogTable.properties) - val attrs = plan.output.map(attr => (attr.name, attr)).toMap - if (cols.exists(!attrs.contains(_))) { - logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " + - s"please check your table properties ${KYUUBI_ZORDER_COLS}.") - plan - } else { - val bound = cols.map(attrs(_)) - val orderExpr = - if (bound.length == 1) { - bound.head - } else { - buildZorder(bound) - } - // TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3 - // see https://github.com/apache/spark/pull/34542 - Sort( - SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil, - conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED), - plan) - } - } - - def applyInternal(plan: LogicalPlan): LogicalPlan - - final override def apply(plan: LogicalPlan): LogicalPlan = { - if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) { - applyInternal(plan) - } else { - plan - } - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingDatasource(session: SparkSession) - extends InsertZorderBeforeWritingDatasourceBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingHive(session: SparkSession) - extends InsertZorderBeforeWritingHiveBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -} diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala index ad95ac4295e..450a2c35e89 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLCommonExtension.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql import org.apache.spark.sql.SparkSessionExtensions -import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource33, InsertZorderBeforeWritingHive33, ResolveZorder} +import org.apache.kyuubi.sql.zorder.{InsertZorderBeforeWritingDatasource, InsertZorderBeforeWritingHive, ResolveZorder} class KyuubiSparkSQLCommonExtension extends (SparkSessionExtensions => Unit) { override def apply(extensions: SparkSessionExtensions): Unit = { @@ -38,8 +38,8 @@ object KyuubiSparkSQLCommonExtension { // should be applied before // RepartitionBeforeWriting and RebalanceBeforeWriting // because we can only apply one of them (i.e. Global Sort or Repartition/Rebalance) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource33) - extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive33) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingDatasource) + extensions.injectPostHocResolutionRule(InsertZorderBeforeWritingHive) extensions.injectPostHocResolutionRule(FinalStageConfigIsolationCleanRule) extensions.injectQueryStagePrepRule(_ => InsertShuffleNodeBeforeJoin) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala index 73ed5e253bb..003ba6b68a6 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala @@ -27,7 +27,11 @@ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException} -trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { +trait ZorderBuilder { + def buildZorder(children: Seq[Expression]): ZorderBase +} + +trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" @@ -139,8 +143,8 @@ trait InsertZorderHelper33 extends Rule[LogicalPlan] with ZorderBuilder { } } -case class InsertZorderBeforeWritingDatasource33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingDatasource(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHadoopFsRelationCommand if insert.query.resolved && @@ -159,8 +163,8 @@ case class InsertZorderBeforeWritingDatasource33(session: SparkSession) } } -case class InsertZorderBeforeWritingHive33(session: SparkSession) - extends InsertZorderHelper33 { +case class InsertZorderBeforeWritingHive(session: SparkSession) + extends InsertZorderHelper { override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { case insert: InsertIntoHiveTable if insert.query.resolved && diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala deleted file mode 100644 index 2c59d148e98..00000000000 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWritingBase.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.sql.zorder - -import java.util.Locale - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, NullsLast, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand -import org.apache.spark.sql.hive.execution.InsertIntoHiveTable - -import org.apache.kyuubi.sql.KyuubiSQLConf - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingDatasourceBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHadoopFsRelationCommand - if insert.query.resolved && insert.bucketSpec.isEmpty && insert.catalogTable.isDefined && - isZorderEnabled(insert.catalogTable.get.properties) => - val newQuery = insertZorder(insert.catalogTable.get, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - case _ => plan - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -abstract class InsertZorderBeforeWritingHiveBase - extends InsertZorderHelper { - override def applyInternal(plan: LogicalPlan): LogicalPlan = plan match { - case insert: InsertIntoHiveTable - if insert.query.resolved && insert.table.bucketSpec.isEmpty && - isZorderEnabled(insert.table.properties) => - val newQuery = insertZorder(insert.table, insert.query) - if (newQuery.eq(insert.query)) { - insert - } else { - insert.copy(query = newQuery) - } - case _ => plan - } -} - -trait ZorderBuilder { - def buildZorder(children: Seq[Expression]): ZorderBase -} - -trait InsertZorderHelper extends Rule[LogicalPlan] with ZorderBuilder { - private val KYUUBI_ZORDER_ENABLED = "kyuubi.zorder.enabled" - private val KYUUBI_ZORDER_COLS = "kyuubi.zorder.cols" - - def isZorderEnabled(props: Map[String, String]): Boolean = { - props.contains(KYUUBI_ZORDER_ENABLED) && - "true".equalsIgnoreCase(props(KYUUBI_ZORDER_ENABLED)) && - props.contains(KYUUBI_ZORDER_COLS) - } - - def getZorderColumns(props: Map[String, String]): Seq[String] = { - val cols = props.get(KYUUBI_ZORDER_COLS) - assert(cols.isDefined) - cols.get.split(",").map(_.trim.toLowerCase(Locale.ROOT)) - } - - def canInsertZorder(query: LogicalPlan): Boolean = query match { - case Project(_, child) => canInsertZorder(child) - // TODO: actually, we can force zorder even if existed some shuffle - case _: Sort => false - case _: RepartitionByExpression => false - case _: Repartition => false - case _ => true - } - - def insertZorder(catalogTable: CatalogTable, plan: LogicalPlan): LogicalPlan = { - if (!canInsertZorder(plan)) { - return plan - } - val cols = getZorderColumns(catalogTable.properties) - val attrs = plan.output.map(attr => (attr.name, attr)).toMap - if (cols.exists(!attrs.contains(_))) { - logWarning(s"target table does not contain all zorder cols: ${cols.mkString(",")}, " + - s"please check your table properties ${KYUUBI_ZORDER_COLS}.") - plan - } else { - val bound = cols.map(attrs(_)) - val orderExpr = - if (bound.length == 1) { - bound.head - } else { - buildZorder(bound) - } - // TODO: We can do rebalance partitions before local sort of zorder after SPARK 3.3 - // see https://github.com/apache/spark/pull/34542 - Sort( - SortOrder(orderExpr, Ascending, NullsLast, Seq.empty) :: Nil, - conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED), - plan) - } - } - - def applyInternal(plan: LogicalPlan): LogicalPlan - - final override def apply(plan: LogicalPlan): LogicalPlan = { - if (conf.getConf(KyuubiSQLConf.INSERT_ZORDER_BEFORE_WRITING)) { - applyInternal(plan) - } else { - plan - } - } -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing datasource if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingDatasource(session: SparkSession) - extends InsertZorderBeforeWritingDatasourceBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -} - -/** - * TODO: shall we forbid zorder if it's dynamic partition inserts ? - * Insert zorder before writing hive if the target table properties has zorder properties - */ -case class InsertZorderBeforeWritingHive(session: SparkSession) - extends InsertZorderBeforeWritingHiveBase { - override def buildZorder(children: Seq[Expression]): ZorderBase = Zorder(children) -}