Skip to content

Commit 62dc24c

Browse files
committed
Merge branch 'master' of https://github.com/NetEase/kyuubi into spark-2.4
2 parents 728454e + ffe4163 commit 62dc24c

File tree

203 files changed

+2687
-2283
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

203 files changed

+2687
-2283
lines changed

.scalafmt.conf

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
version = 3.1.1
2+
runner.dialect=scala212
3+
project.git=true
4+
5+
align.preset = none
6+
align.openParenDefnSite = false
7+
align.openParenCallSite = false
8+
align.stripMargin = true
9+
align.tokens = []
10+
assumeStandardLibraryStripMargin = true
11+
danglingParentheses.preset = false
12+
docstrings.style = Asterisk
13+
docstrings.wrap = no
14+
importSelectors = singleLine
15+
indent.extendSite = 2
16+
literals.hexDigits = Upper
17+
maxColumn = 100
18+
newlines.source = keep
19+
newlines.topLevelStatementBlankLines = []
20+
optIn.configStyleArguments = false
21+
rewrite.imports.groups = [
22+
["javax?\\..*"],
23+
["scala\\..*"],
24+
["(?!org\\.apache\\.kyuubi\\.).*"],
25+
["org\\.apache\\.kyuubi\\..*"]
26+
]
27+
rewrite.imports.sort = scalastyle
28+
rewrite.rules = [Imports, SortModifiers]

dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWriting.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.spark.sql.types.IntegerType
2626

2727
trait RepartitionBuilderWithRepartitionByExpression extends RepartitionBuilder {
2828
override def buildRepartition(
29-
dynamicPartitionColumns: Seq[Attribute], query: LogicalPlan): LogicalPlan = {
29+
dynamicPartitionColumns: Seq[Attribute],
30+
query: LogicalPlan): LogicalPlan = {
3031
if (dynamicPartitionColumns.isEmpty) {
3132
RepartitionByExpression(
3233
Seq.empty,
@@ -46,10 +47,11 @@ trait RepartitionBuilderWithRepartitionByExpression extends RepartitionBuilder {
4647
// Dynamic partition insertion will add repartition by partition column, but it could cause
4748
// data skew (one partition value has large data). So we add extra partition column for the
4849
// same dynamic partition to avoid skew.
49-
Cast(Multiply(
50-
new Rand(Literal(new Random().nextLong())),
51-
Literal(partitionNumber.toDouble)
52-
), IntegerType) :: Nil
50+
Cast(
51+
Multiply(
52+
new Rand(Literal(new Random().nextLong())),
53+
Literal(partitionNumber.toDouble)),
54+
IntegerType) :: Nil
5355
}
5456
}
5557

@@ -61,8 +63,7 @@ trait RepartitionBuilderWithRepartitionByExpression extends RepartitionBuilder {
6163
*/
6264
case class RepartitionBeforeWritingDatasource(session: SparkSession)
6365
extends RepartitionBeforeWritingDatasourceBase
64-
with RepartitionBuilderWithRepartitionByExpression {
65-
}
66+
with RepartitionBuilderWithRepartitionByExpression {}
6667

6768
/**
6869
* For Hive table, there two commands can write data to table
@@ -72,5 +73,4 @@ case class RepartitionBeforeWritingDatasource(session: SparkSession)
7273
*/
7374
case class RepartitionBeforeWritingHive(session: SparkSession)
7475
extends RepartitionBeforeWritingHiveBase
75-
with RepartitionBuilderWithRepartitionByExpression {
76-
}
76+
with RepartitionBuilderWithRepartitionByExpression {}

dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/sqlclassification/KyuubiGetSqlClassification.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ object KyuubiGetSqlClassification extends Logging {
6767
}
6868
}.getOrElse(
6969
throw new IllegalArgumentException(
70-
s"You should restart engine with: ${SQL_CLASSIFICATION_ENABLED.key} true")
71-
)
70+
s"You should restart engine with: ${SQL_CLASSIFICATION_ENABLED.key} true"))
7271
}
7372
}

dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/ForcedMaxOutputRowsRule.scala

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -34,29 +34,30 @@ object ForcedMaxOutputRowsConstraint {
3434
}
3535

3636
/*
37-
* Add ForcedMaxOutputRows rule for output rows limitation
38-
* to avoid huge output rows of non_limit query unexpectedly
39-
* mainly applied to cases as below:
40-
*
41-
* case 1:
42-
* {{{
43-
* SELECT [c1, c2, ...]
44-
* }}}
45-
*
46-
* case 2:
47-
* {{{
48-
* WITH CTE AS (
49-
* ...)
50-
* SELECT [c1, c2, ...] FROM CTE ...
51-
* }}}
52-
*
53-
* The Logical Rule add a GlobalLimit node before root project
54-
* */
37+
* Add ForcedMaxOutputRows rule for output rows limitation
38+
* to avoid huge output rows of non_limit query unexpectedly
39+
* mainly applied to cases as below:
40+
*
41+
* case 1:
42+
* {{{
43+
* SELECT [c1, c2, ...]
44+
* }}}
45+
*
46+
* case 2:
47+
* {{{
48+
* WITH CTE AS (
49+
* ...)
50+
* SELECT [c1, c2, ...] FROM CTE ...
51+
* }}}
52+
*
53+
* The Logical Rule add a GlobalLimit node before root project
54+
* */
5555
case class ForcedMaxOutputRowsRule(session: SparkSession) extends Rule[LogicalPlan] {
5656

5757
private def isChildAggregate(a: Aggregate): Boolean = a
58-
.aggregateExpressions.exists(p => p.getTagValue(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE)
59-
.contains(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE_FLAG))
58+
.aggregateExpressions.exists(p =>
59+
p.getTagValue(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE)
60+
.contains(ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE_FLAG))
6061

6162
private def isView: Boolean = {
6263
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
@@ -65,7 +66,7 @@ case class ForcedMaxOutputRowsRule(session: SparkSession) extends Rule[LogicalPl
6566

6667
private def canInsertLimitInner(p: LogicalPlan): Boolean = p match {
6768

68-
case Aggregate(_, Alias(_, "havingCondition")::Nil, _) => false
69+
case Aggregate(_, Alias(_, "havingCondition") :: Nil, _) => false
6970
case agg: Aggregate => !isChildAggregate(agg)
7071
case _: RepartitionByExpression => true
7172
case _: Distinct => true
@@ -87,17 +88,19 @@ case class ForcedMaxOutputRowsRule(session: SparkSession) extends Rule[LogicalPl
8788

8889
maxOutputRowsOpt match {
8990
case Some(forcedMaxOutputRows) => canInsertLimitInner(p) &&
90-
!p.maxRows.exists(_ <= forcedMaxOutputRows) &&
91-
!isView
91+
!p.maxRows.exists(_ <= forcedMaxOutputRows) &&
92+
!isView
9293
case None => false
9394
}
9495
}
9596

9697
override def apply(plan: LogicalPlan): LogicalPlan = {
9798
val maxOutputRowsOpt = conf.getConf(KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS)
9899
plan match {
99-
case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) => Limit(
100-
maxOutputRowsOpt.get, plan)
100+
case p if p.resolved && canInsertLimit(p, maxOutputRowsOpt) =>
101+
Limit(
102+
maxOutputRowsOpt.get,
103+
plan)
101104
case _ => plan
102105
}
103106
}
@@ -110,33 +113,33 @@ case class MarkAggregateOrderRule(session: SparkSession) extends Rule[LogicalPla
110113
// mark child aggregate
111114
a.aggregateExpressions.filter(_.resolved).foreach(_.setTagValue(
112115
ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE,
113-
ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE_FLAG)
114-
)
116+
ForcedMaxOutputRowsConstraint.CHILD_AGGREGATE_FLAG))
115117
}
116118

117119
private def findAndMarkChildAggregate(plan: LogicalPlan): LogicalPlan = plan match {
118120
/*
119-
* The case mainly process order not aggregate column but grouping column as below
120-
* SELECT c1, COUNT(*) as cnt
121-
* FROM t1
122-
* GROUP BY c1
123-
* ORDER BY c1
124-
* */
125-
case a: Aggregate if a.aggregateExpressions
126-
.exists(x => x.resolved && x.name.equals("aggOrder")) => markChildAggregate(a)
121+
* The case mainly process order not aggregate column but grouping column as below
122+
* SELECT c1, COUNT(*) as cnt
123+
* FROM t1
124+
* GROUP BY c1
125+
* ORDER BY c1
126+
* */
127+
case a: Aggregate
128+
if a.aggregateExpressions
129+
.exists(x => x.resolved && x.name.equals("aggOrder")) =>
130+
markChildAggregate(a)
127131
plan
128132

129-
case _ => plan.children.foreach(_.foreach {
133+
case _ =>
134+
plan.children.foreach(_.foreach {
130135
case agg: Aggregate => markChildAggregate(agg)
131136
case _ => Unit
132-
}
133-
)
137+
})
134138
plan
135139
}
136140

137141
override def apply(plan: LogicalPlan): LogicalPlan = conf.getConf(
138-
KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS
139-
) match {
142+
KyuubiSQLConf.WATCHDOG_FORCED_MAXOUTPUTROWS) match {
140143
case Some(_) => findAndMarkChildAggregate(plan)
141144
case _ => plan
142145
}

dev/kyuubi-extension-spark-3-1/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxHivePartitionStrategy.scala

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,25 +37,28 @@ case class MaxHivePartitionStrategy(session: SparkSession)
3737
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
3838
conf.getConf(KyuubiSQLConf.WATCHDOG_MAX_HIVEPARTITION) match {
3939
case Some(maxHivePartition) => plan match {
40-
case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
40+
case ScanOperation(_, _, relation: HiveTableRelation) if relation.isPartitioned =>
4141
relation.prunedPartitions match {
42-
case Some(prunedPartitions) => if (prunedPartitions.size > maxHivePartition) {
43-
throw new MaxHivePartitionExceedException(
44-
s"""
45-
|SQL job scan hive partition: ${prunedPartitions.size}
46-
|exceed restrict of hive scan maxPartition $maxHivePartition
47-
|You should optimize your SQL logical according partition structure
48-
|or shorten query scope such as p_date, detail as below:
49-
|Table: ${relation.tableMeta.qualifiedName}
50-
|Owner: ${relation.tableMeta.owner}
51-
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")}
52-
|""".stripMargin)
53-
} else {
54-
Nil
55-
}
56-
case _ => val totalPartitions = session
57-
.sessionState.catalog.externalCatalog.listPartitionNames(
58-
relation.tableMeta.database, relation.tableMeta.identifier.table)
42+
case Some(prunedPartitions) =>
43+
if (prunedPartitions.size > maxHivePartition) {
44+
throw new MaxHivePartitionExceedException(
45+
s"""
46+
|SQL job scan hive partition: ${prunedPartitions.size}
47+
|exceed restrict of hive scan maxPartition $maxHivePartition
48+
|You should optimize your SQL logical according partition structure
49+
|or shorten query scope such as p_date, detail as below:
50+
|Table: ${relation.tableMeta.qualifiedName}
51+
|Owner: ${relation.tableMeta.owner}
52+
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")}
53+
|""".stripMargin)
54+
} else {
55+
Nil
56+
}
57+
case _ =>
58+
val totalPartitions = session
59+
.sessionState.catalog.externalCatalog.listPartitionNames(
60+
relation.tableMeta.database,
61+
relation.tableMeta.identifier.table)
5962
if (totalPartitions.size > maxHivePartition) {
6063
throw new MaxHivePartitionExceedException(
6164
s"""
@@ -66,12 +69,12 @@ case class MaxHivePartitionStrategy(session: SparkSession)
6669
|Owner: ${relation.tableMeta.owner}
6770
|Partition Structure: ${relation.partitionCols.map(_.name).mkString(" -> ")}
6871
|""".stripMargin)
69-
} else {
72+
} else {
7073
Nil
7174
}
7275
}
73-
case _ => Nil
74-
}
76+
case _ => Nil
77+
}
7578
case _ => Nil
7679
}
7780
}

dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/FinalStageConfigIsolationSuite.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ import org.apache.spark.sql.internal.SQLConf
2323
import org.apache.kyuubi.sql.{FinalStageConfigIsolation, KyuubiSQLConf}
2424

2525
class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
26-
protected override def beforeAll(): Unit = {
26+
override protected def beforeAll(): Unit = {
2727
super.beforeAll()
2828
setupData()
2929
}
3030

3131
test("final stage config set reset check") {
32-
withSQLConf(KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
32+
withSQLConf(
33+
KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
3334
"spark.sql.finalStage.adaptive.coalescePartitions.minPartitionNum" -> "1",
3435
"spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes" -> "100") {
3536
// use loop to double check final stage config doesn't affect the sql query each other
@@ -79,8 +80,10 @@ class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
7980
}
8081

8182
test("final stage config isolation") {
82-
def checkPartitionNum(sqlString: String, previousPartitionNum: Int,
83-
finalPartitionNum: Int): Unit = {
83+
def checkPartitionNum(
84+
sqlString: String,
85+
previousPartitionNum: Int,
86+
finalPartitionNum: Int): Unit = {
8487
val df = sql(sqlString)
8588
df.collect()
8689
val shuffleReaders = collect(df.queryExecution.executedPlan) {
@@ -99,7 +102,8 @@ class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
99102
assert(df.rdd.partitions.length === finalPartitionNum)
100103
}
101104

102-
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
105+
withSQLConf(
106+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
103107
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
104108
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
105109
KyuubiSQLConf.FINAL_STAGE_CONFIG_ISOLATION.key -> "true",
@@ -156,8 +160,7 @@ class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
156160
|) t1 ON t0.c2 = t1.c2
157161
|""".stripMargin,
158162
3,
159-
1
160-
)
163+
1)
161164

162165
// one shuffle reader
163166
checkPartitionNum(
@@ -169,8 +172,7 @@ class FinalStageConfigIsolationSuite extends KyuubiSparkSQLExtensionTest {
169172
|) t1 ON t0.c1 = t1.c1
170173
|""".stripMargin,
171174
1,
172-
1
173-
)
175+
1)
174176
}
175177
}
176178
}

dev/kyuubi-extension-spark-3-1/src/test/scala/org/apache/spark/sql/RepartitionBeforeWritingSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
3333
assert(r.optNumPartitions ===
3434
spark.sessionState.conf.getConf(KyuubiSQLConf.INSERT_REPARTITION_NUM))
3535
r
36-
}.size == 1
37-
)
36+
}.size == 1)
3837
}
3938

4039
// It's better to set config explicitly in case of we change the default value.
@@ -75,8 +74,7 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
7574
assert(
7675
df.queryExecution.analyzed.collect {
7776
case r: RepartitionByExpression => r
78-
}.isEmpty
79-
)
77+
}.isEmpty)
8078
}
8179

8280
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true") {
@@ -132,7 +130,8 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
132130
}.size == 1)
133131
}
134132

135-
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
133+
withSQLConf(
134+
KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE.key -> "true",
136135
KyuubiSQLConf.DYNAMIC_PARTITION_INSERTION_REPARTITION_NUM.key -> "2") {
137136
Seq("USING PARQUET", "").foreach { storage =>
138137
withTable("tmp1") {
@@ -149,7 +148,8 @@ class RepartitionBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
149148
}
150149

151150
test("OptimizedCreateHiveTableAsSelectCommand") {
152-
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
151+
withSQLConf(
152+
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true",
153153
HiveUtils.CONVERT_METASTORE_CTAS.key -> "true") {
154154
withTable("t") {
155155
val df = sql(s"CREATE TABLE t STORED AS parquet AS SELECT 1 as a")

0 commit comments

Comments
 (0)