Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Commit 1c3b020

Browse files
authored
Hybrid scan operator for leveraging index alongside newly appended data - BucketUnion (#151)
1 parent c71d956 commit 1c3b020

File tree

4 files changed

+343
-0
lines changed

4 files changed

+343
-0
lines changed
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (2020) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.execution
18+
19+
import scala.reflect.ClassTag
20+
21+
import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
22+
import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD}
23+
import org.apache.spark.sql.catalyst.InternalRow
24+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
25+
import org.apache.spark.sql.catalyst.expressions.Attribute
26+
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
27+
import org.apache.spark.sql.execution.SparkPlan
28+
29+
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
30+
31+
/**
32+
* [[BucketUnionRDD]] is required for the hybrid scan operation which merges index data and
33+
* appended data without re-shuffling the index data. Spark does not support Union that retains
34+
* output partition specification (i.e., using PartitionSpecification). The default operation
35+
* [[PartitionerAwareUnionRDD]] does not retain outputPartitioning of result i.e., even if both
36+
* sides are bucketed in a compatible way, it will cause a shuffle.
37+
*
38+
* To avoid these issues, we define a new BucketUnion operation that avoids a shuffle when
39+
* the following conditions are satisfied:
40+
* - input RDDs must have the same number of partitions.
41+
* - input RDDs must have the same partitioning keys.
42+
* - input RDDs must have the same column schema.
43+
*
44+
* Unfortunately, since there is no explicit API to check Partitioning keys in RDD, we have to
45+
* asset the partitioning keys on the caller side. Therefore, [[BucketUnionRDD]] is Hyperspace
46+
* internal use only.
47+
*
48+
* You can find more detailed information about Bucketing optimization in:
49+
* ''Bucketing 2.0: Improve Spark SQL Performance by Removing Shuffle''
50+
* Video: [[https://youtu.be/7cvaH33S7uc ]]
51+
*/
52+
private[hyperspace] class BucketUnionRDD[T: ClassTag](
53+
sc: SparkContext,
54+
var rdds: Seq[RDD[T]],
55+
bucketSpec: BucketSpec)
56+
extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
57+
require(rdds.nonEmpty)
58+
require(rdds.forall(_.getNumPartitions == bucketSpec.numBuckets))
59+
60+
// copy from org.apache.spark.rdd.PartitionerAwareUnionRDD
61+
override def getPartitions: Array[Partition] = {
62+
val numBuckets = bucketSpec.numBuckets
63+
(0 until numBuckets).map { index =>
64+
new BucketUnionRDDPartition(rdds, index)
65+
}.toArray
66+
}
67+
68+
// copy from org.apache.spark.rdd.PartitionerAwareUnionRDD
69+
override def compute(s: Partition, context: TaskContext): Iterator[T] = {
70+
val parentPartitions = s.asInstanceOf[BucketUnionRDDPartition].parents
71+
rdds.zip(parentPartitions).iterator.flatMap {
72+
case (rdd, p) => rdd.iterator(p, context)
73+
}
74+
}
75+
76+
// copy from org.apache.spark.rdd.PartitionerAwareUnionRDD
77+
override def clearDependencies() {
78+
super.clearDependencies()
79+
rdds = null
80+
}
81+
}
82+
83+
/**
84+
* [[BucketUnionRDDPartition]] keeps partitions for each partition index.
85+
* @param rdds Input RDDs.
86+
* @param index Partition index.
87+
*/
88+
private[hyperspace] class BucketUnionRDDPartition(
89+
@transient val rdds: Seq[RDD[_]],
90+
override val index: Int)
91+
extends Partition {
92+
val parents: Array[Partition] = rdds.map(_.partitions(index)).toArray
93+
94+
override def hashCode(): Int = index
95+
override def equals(other: Any): Boolean = super.equals(other)
96+
}
97+
98+
/**
99+
* [[BucketUnionExec]] is Spark Plan for [[BucketUnion]].
100+
*
101+
* @param children Child plans.
102+
* @param bucketSpec Bucket specification.
103+
*/
104+
private[hyperspace] case class BucketUnionExec(children: Seq[SparkPlan], bucketSpec: BucketSpec)
105+
extends SparkPlan {
106+
override protected def doExecute(): RDD[InternalRow] = {
107+
new BucketUnionRDD[InternalRow](sparkContext, children.map(_.execute()), bucketSpec)
108+
}
109+
110+
override def output: Seq[Attribute] = children.head.output
111+
112+
override def outputPartitioning: Partitioning = {
113+
assert(children.map(_.outputPartitioning).toSet.size == 1)
114+
assert(children.head.outputPartitioning.isInstanceOf[HashPartitioning])
115+
children.head.outputPartitioning
116+
}
117+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (2020) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.execution
18+
19+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
20+
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
21+
22+
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
23+
24+
/**
25+
* [[BucketUnionStrategy]] is SparkStrategy for converting [[BucketUnion]] (Logical Plan)
26+
* to [[BucketUnionExec]] (Spark Plan)
27+
*/
28+
private[hyperspace] object BucketUnionStrategy extends SparkStrategy {
29+
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
30+
case p: BucketUnion =>
31+
BucketUnionExec(p.children.map(planLater), p.bucketSpec) :: Nil
32+
case _ => Nil
33+
}
34+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (2020) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index.plans.logical
18+
19+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22+
23+
/**
24+
* [[BucketUnion]] is logical plan for Bucket-aware Union operation which retains
25+
* outputPartitioning of result RDDs so as to avoid performing unnecessary shuffle after
26+
* Union operation per bucket.
27+
*
28+
* @param children Child plans.
29+
* @param bucketSpec Bucket Specification.
30+
*/
31+
private[hyperspace] case class BucketUnion(children: Seq[LogicalPlan], bucketSpec: BucketSpec)
32+
extends LogicalPlan {
33+
require(resolved)
34+
override def output: Seq[Attribute] = children.head.output
35+
36+
// copy from org.apache.spark.sql.catalyst.plans.logical.Union
37+
override def maxRows: Option[Long] = {
38+
if (children.exists(_.maxRows.isEmpty)) {
39+
None
40+
} else {
41+
Some(children.flatMap(_.maxRows).sum)
42+
}
43+
}
44+
45+
// copy from org.apache.spark.sql.catalyst.plans.logical.Union
46+
override def maxRowsPerPartition: Option[Long] = {
47+
if (children.exists(_.maxRowsPerPartition.isEmpty)) {
48+
None
49+
} else {
50+
Some(children.flatMap(_.maxRowsPerPartition).sum)
51+
}
52+
}
53+
54+
// copy from org.apache.spark.sql.catalyst.plans.logical.Union
55+
override lazy val resolved: Boolean = {
56+
// allChildrenCompatible needs to be evaluated after childrenResolved
57+
def allChildrenCompatible: Boolean =
58+
children.tail.forall(
59+
child =>
60+
// compare the attribute number with the first child
61+
child.output.length == children.head.output.length &&
62+
// compare the data types with the first child
63+
child.output.zip(children.head.output).forall {
64+
case (l, r) => l.dataType.equals(r.dataType)
65+
})
66+
children.length > 1 && childrenResolved && allChildrenCompatible
67+
}
68+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright (2020) The Hyperspace Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.microsoft.hyperspace.index
18+
19+
import org.apache.spark.SparkFunSuite
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.catalog.BucketSpec
22+
23+
import com.microsoft.hyperspace.SparkInvolvedSuite
24+
import com.microsoft.hyperspace.index.execution.{BucketUnionExec, BucketUnionRDD, BucketUnionRDDPartition, BucketUnionStrategy}
25+
import com.microsoft.hyperspace.index.plans.logical.BucketUnion
26+
27+
class BucketUnionTest extends SparkFunSuite with SparkInvolvedSuite {
28+
29+
test("BucketUnion test for operator pre-requisites") {
30+
import spark.implicits._
31+
val df1 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
32+
val df2 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
33+
val df3 = Seq(("name1", 1), ("name2", 2)).toDF("name", "id")
34+
val df4 = Seq((1, "name1", 20), (2, "name2", 10)).toDF("id", "name", "age")
35+
36+
// different column schema
37+
intercept[IllegalArgumentException] {
38+
BucketUnion(
39+
Seq(df1.queryExecution.optimizedPlan, df4.queryExecution.optimizedPlan),
40+
BucketSpec(1, Seq(), Seq()))
41+
}
42+
43+
// different order of columns
44+
intercept[IllegalArgumentException] {
45+
BucketUnion(
46+
Seq(df1.queryExecution.optimizedPlan, df3.queryExecution.optimizedPlan),
47+
BucketSpec(1, Seq(), Seq()))
48+
}
49+
50+
BucketUnion(
51+
Seq(df1.queryExecution.optimizedPlan, df2.queryExecution.optimizedPlan),
52+
BucketSpec(1, Seq(), Seq()))
53+
}
54+
55+
test("BucketUnionStrategy test if strategy introduces BucketUnionExec in the Spark Plan") {
56+
import spark.implicits._
57+
val df1 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
58+
val df2 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
59+
val bucket = BucketUnion(
60+
Seq(df1.queryExecution.optimizedPlan, df2.queryExecution.optimizedPlan),
61+
BucketSpec(1, Seq(), Seq()))
62+
63+
assert(BucketUnionStrategy(bucket).collect {
64+
case BucketUnionExec(_, _) => true
65+
}.length == 1)
66+
67+
assert(BucketUnionStrategy(df1.queryExecution.optimizedPlan).collect {
68+
case BucketUnionExec(_, _) => true
69+
}.isEmpty)
70+
}
71+
72+
test("BucketUnionExec test that partition count matches on both sides") {
73+
import spark.implicits._
74+
val df1 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
75+
val p1 = df1.repartition(10)
76+
val df2 = Seq((1, "name1"), (2, "name2")).toDF("id", "name")
77+
val p2_1 = df2.repartition(9)
78+
val p2_2 = df2.repartition(10)
79+
80+
// different number of partition
81+
intercept[AssertionError] {
82+
val bucket = BucketUnion(
83+
Seq(p1.queryExecution.optimizedPlan, p2_1.queryExecution.optimizedPlan),
84+
BucketSpec(10, Seq(), Seq()))
85+
spark.sessionState.executePlan(bucket).sparkPlan
86+
}
87+
88+
val bucket = BucketUnion(
89+
Seq(p1.queryExecution.optimizedPlan, p2_2.queryExecution.optimizedPlan),
90+
BucketSpec(10, Seq(), Seq()))
91+
92+
assert(BucketUnionStrategy(bucket).collect {
93+
case p: BucketUnionExec =>
94+
assert(p.bucketSpec.numBuckets == 10)
95+
assert(p.children.length == 2)
96+
assert(p.output.length == p1.schema.fields.length)
97+
true
98+
}.length == 1)
99+
}
100+
101+
test("BucketUnionRDD test that partition columns with same value fall in the same partition") {
102+
import spark.implicits._
103+
val df1 = Seq((2, "name1"), (3, "name2")).toDF("id", "name")
104+
val p1 = df1.repartition(10, $"id")
105+
val df2 = Seq((2, "name3"), (3, "name4")).toDF("id", "name")
106+
val p2 = df2.repartition(10, $"id")
107+
val bucketSpec = BucketSpec(10, Seq("id"), Seq())
108+
109+
val rdd = new BucketUnionRDD[Row](spark.sparkContext, Seq(p1.rdd, p2.rdd), bucketSpec)
110+
assert(
111+
rdd.collect.sortBy(r => (r.getInt(0), r.getString(1))).map(r => r.toSeq.toList).toList
112+
== Seq(Seq(2, "name1"), Seq(2, "name3"), Seq(3, "name2"), Seq(3, "name4")))
113+
assert(rdd.getPartitions.length == 10)
114+
assert(rdd.partitions.head.isInstanceOf[BucketUnionRDDPartition])
115+
116+
val partitionSum: Seq[Int] = rdd
117+
.mapPartitions(it => Iterator.single(it.map(r => r.getInt(0)).sum))
118+
.collect()
119+
.toSeq
120+
121+
// Check if all partitioned keys with the same value fall in the same partition.
122+
assert(partitionSum.equals(Seq(0, 6, 0, 0, 4, 0, 0, 0, 0, 0)))
123+
}
124+
}

0 commit comments

Comments
 (0)