-
Notifications
You must be signed in to change notification settings - Fork 115
Hybrid scan operator for leveraging index alongside newly appended data - BucketUnion #151
Conversation
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor comments, but LGTM, thanks @sezruby!
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
@apoorvedave1 @rapoth @pirz Can you review as well? |
@sezruby Could you update the title to be a bit more descriptive? (it's used as a commit message). |
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few minor comments, otherwise LGTM, thanks @sezruby
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
|
||
test("BucketUnion require test") { | ||
import spark.implicits._ | ||
val df1 = Seq((1, "name1"), (2, "name2")).toDF("id", "name") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these DF definitions are repeated per test; Do you think it is possible to define them once at the class level and initialize them in beforeAll? (similar to partitionedDataDF
and nonPartitionedDataDF
in CreateIndexTests.scala
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually prefer the current way. The dfs are simple enough and it makes it easier to read in this scope; I don't have to go back and forth to remember how it was defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for opening a short and consise PR! I really appreciate it!
src/main/scala/com/microsoft/hyperspace/index/execution/BucketUnionExec.scala
Show resolved
Hide resolved
src/main/scala/com/microsoft/hyperspace/index/plans/logical/BucketUnion.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
src/test/scala/com/microsoft/hyperspace/index/BucketUnionTest.scala
Outdated
Show resolved
Hide resolved
Co-authored-by: Rahul Potharaju <rapoth@microsoft.com> Co-authored-by: Apoorve Dave <66283785+apoorvedave1@users.noreply.github.com>
Co-authored-by: Rahul Potharaju <rapoth@microsoft.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍 , thanks @sezruby
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @sezruby!
What changes were proposed in this pull request?
For full context on the "why" for this PR, please see the main issue: #150
Introducing a new
BucketUnion
operator that is useful for implementing hybrid scan, a technique that we propose that can leverage the index and appended data without the need to re-shuffle the index, thus preserving the benefits of the index.As part of this, the following classes are being implemented:
BucketUnion
: Logical Plan operator; Used during logical plan optimization when the newly appended data needs to be union-ed with the data being read from the index.BucketUnionExec
: SparkPlan (Physical operator); Calls intoBucketUnionRDD
BucketUnionRDD
: RDD operatorBucketUnionRDDPartition
: PartitionBucketUnionStrategy
: SparkStrategy that is used when the Logical Plan is being converted to SparkPlan; More specifically, convertsBucketUnion
toBucketUnionExec
Why are the changes needed?
Spark does not support Union using
PartitionSpecification
, but justPartitionerAwareUnionRDD
operation which does not retain outputPartitioning of result. Therefore, we define a new Union operation (being called theBucketUnion
) which works when the following conditions are satisfied:Unfortunately, since there is no explicit API to check Partitioning keys in RDD, we have to assure that on the caller side. Therefore,
BucketUnionRDD
is Hyperspace internal use only.BucketUnion
can be used to merge index data & newly appended data without losing the bucketing specification (outputPartitioning
).Does this PR introduce any user-facing change?
Existing experience:
When the underlying data changes, Hyperspace decides to not use the index anymore.
New experience:
When the underlying data changes and hybrid scan is enabled, Hyperspace utilizes the index to the extent possible and performs a linear scan on the new data.
How was this patch tested?
BucketUnionTest