-
Notifications
You must be signed in to change notification settings - Fork 115
Hybrid Scan for File/Partition Mutable Datasets #150
Description
Describe the problem
After the original dataset got appended new files/partitions and/or some files/partitions got deleted, the index cannot be used until a user calls refresh API.
Describe your proposed solution
Hybrid Scan is introduced to support these dataset seamlessly and also avoid frequent refreshment of indexes. This feature allows to utilize the outdated index by analyzing the difference and modifying the plan properly. The simple ideas for both are:
- appended files - on-the-fly index build for the appended data & merge into the index data
- deleted files - injecting pushdown filter conditions of lineage column to the index relation, so that the rows from the deleted files are not included in the results. (The lineage column is already introduced by Add lineage to covering index records #104)
Hybrid Scan can handle file/partition mutable dataset, for example:
// original dataset
["/path/to/part-0000.parquet", "/path/to/part-0001.parquet", "/path/to/part-0002.parquet"]
// got appended & deleted
[ "/path/to/part-0001.parquet", "/path/to/part-0002.parquet", "/path/to/part-0003.parquet"]
Append-only
#165 addresses the plan modification for append-only dataset.
Basically there are 3 cases for now:
- Case 1) Filter Index Rule & parquet source format
- In this case, we can just add the file list of appended files to the file list of the index relation (i.e. index data). It's because:
- it's guaranteed that newly appended source files always have the all columns in the index data (except for lineage).
- Filter Index Rule does not utilize bucketing information for now; able to read both index data & newly appended data with 1 FileScan node. See below
InMemoryFileIndex
- In this case, we can just add the file list of appended files to the file list of the index relation (i.e. index data). It's because:
...
<----:- *(1) Project [name#1, id#0]---->
<----: +- *(1) Filter (isnotnull(id#0) && (id#0 >= 1))---->
<----: +- *(1) FileScan parquet [id#0,name#1] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[<list of index data files> , <list of newly appended files>], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
...
- Case 2) Filter Index Rule & non-parquet source format
- Other than Parquet, we cannot simply read the data along with index data files in Parquet format with 1 FileScan node. So we need to merge both data in a different way. We could use the below Case3 for this, but Filter Index Rule doesn't use the bucketing information; it doesn't need to shuffle newly appended data.
- Therefore, Spark's
Union
can be used here (Improvement of HybridScan for FilterIndexRule #145):
Union => Project => Filter => Relation (index)
|=> Project => Filter => Relation (appended data)
- Case 3) Join Index Rule
- We can utilize BucketUnion (Hybrid scan operator for leveraging index alongside newly appended data - BucketUnion #151) to merge index data and appended data. In this way, we can retain the bucketing information of index, which enables to avoid unnecessary shuffling of index data.
+- BucketUnion 200 buckets, bucket columns: [l_orderkey] <===== merge both plans
:- Project [l_orderkey#21L] <===== original index plan
: +- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
: +- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
+- RepartitionByExpression [l_orderkey#21L], 200 <===== on-the-fly shuffle with index spec
+- Project [l_orderkey#21L] <===== newly appended data
+- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
+- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
Delete-only
With the lineage column, we could inject push-down filters for deleted files to the index relation.
Filter(Not(In( <lineage column> , Array(Literal("deleted_file_path1"), Literal("deleted_file_path2"))))
=============================================================
Plan with indexes:
=============================================================
Project [id#0, name#1, _data_file_name#26]
+- Filter ((NOT _data_file_name#26 IN (part-00003-0fc50086-fd6e-4527-ab41-7c48989f928e-c000.snappy.parquet) && isnotnull(id#0)) && (id#0 = 1))
<----+- FileScan parquet [id#0,name#1,_data_file_name#26] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index2/v__=0], PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [part-00003-0fc50086-fd6e-4527-ab41-7c48989f928e-c000.snappy.parquet])),..., ReadSchema: struct<id:int,name:string,_data_file_name:string>, SelectedBucketsCount: 1 out of 200---->
Append & Delete
In case there are both appended and deleted files, we cannot apply Case1 of Append-only dataset with delete. Because the newly appended source files does not have the lineage column, we cannot read appended files with 1 FileScan node (schema-mismatch). So in this case, Append-only Case1 also will be handled as Append-only Case2.
Regarding the other cases, as appended files are processed with the other FileScan node, we could use support both append or delete dataset.
Work Items
- BucketUnion (PR Hybrid scan operator for leveraging index alongside newly appended data - BucketUnion #151)
- HybridScanEnbaled config (PR Add HyperspaceConf #152)
- getCandidateIndex (PR Modify getCandidateIndex for hybrid scan #153)
- Rank algorithm (PR Fix rank algorithm for Hybrid Scan #164)
- Plan modification & Join/Filter Index Rule (PR Modify logical plan to merge newly appended files and index data #165)
- Union for non-parquet format + Filter Rule (ISSUE Improvement of HybridScan for FilterIndexRule #145, PR Modify logical plan to merge newly appended files and index data #165)
- Delete support with a new config (PR Inject push-down filter to exclude indexed rows from deleted files #171)
Future work
- Introduce a similarity threshold for Hybrid Scan (ISSUE [FEATURE REQUEST]: Introduce a similarity threshold for Hybrid Scan #159)
- Add plan signature validation for Hybrid Scan (ISSUE Add plan signature validation for Hybrid Scan #158)
- Optimization of applying Rules for Hybrid Scan (ISSUE Optimization of index selection process #160)
Describe alternatives you've considered
Call refreshIndex API
Additional context
Add any other context or screenshots about the feature request here.