Skip to content
This repository was archived by the owner on Jun 14, 2024. It is now read-only.
This repository was archived by the owner on Jun 14, 2024. It is now read-only.

Hybrid Scan for File/Partition Mutable Datasets #150

@sezruby

Description

@sezruby

Describe the problem

After the original dataset got appended new files/partitions and/or some files/partitions got deleted, the index cannot be used until a user calls refresh API.

Describe your proposed solution

Hybrid Scan is introduced to support these dataset seamlessly and also avoid frequent refreshment of indexes. This feature allows to utilize the outdated index by analyzing the difference and modifying the plan properly. The simple ideas for both are:

  • appended files - on-the-fly index build for the appended data & merge into the index data
  • deleted files - injecting pushdown filter conditions of lineage column to the index relation, so that the rows from the deleted files are not included in the results. (The lineage column is already introduced by Add lineage to covering index records #104)

Hybrid Scan can handle file/partition mutable dataset, for example:

// original dataset
["/path/to/part-0000.parquet", "/path/to/part-0001.parquet", "/path/to/part-0002.parquet"]

// got appended & deleted
[                              "/path/to/part-0001.parquet", "/path/to/part-0002.parquet", "/path/to/part-0003.parquet"]
Append-only

#165 addresses the plan modification for append-only dataset.

Basically there are 3 cases for now:

  • Case 1) Filter Index Rule & parquet source format
    • In this case, we can just add the file list of appended files to the file list of the index relation (i.e. index data). It's because:
      • it's guaranteed that newly appended source files always have the all columns in the index data (except for lineage).
      • Filter Index Rule does not utilize bucketing information for now; able to read both index data & newly appended data with 1 FileScan node. See below InMemoryFileIndex
...
<----:- *(1) Project [name#1, id#0]---->
<----:  +- *(1) Filter (isnotnull(id#0) && (id#0 >= 1))---->
<----:     +- *(1) FileScan parquet [id#0,name#1] Batched: true, Format: Parquet, 
                     Location: InMemoryFileIndex[<list of index data files> , <list of newly appended files>], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThanOrEqual(id,1)], ReadSchema: struct<id:int,name:string>---->
...
  • Case 2) Filter Index Rule & non-parquet source format
    • Other than Parquet, we cannot simply read the data along with index data files in Parquet format with 1 FileScan node. So we need to merge both data in a different way. We could use the below Case3 for this, but Filter Index Rule doesn't use the bucketing information; it doesn't need to shuffle newly appended data.
    • Therefore, Spark's Union can be used here (Improvement of HybridScan for FilterIndexRule #145):
Union => Project => Filter => Relation (index)
      |=> Project => Filter => Relation (appended data)
 +- BucketUnion 200 buckets, bucket columns: [l_orderkey]      <===== merge both plans
            :- Project [l_orderkey#21L]                        <===== original index plan
            :  +- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
            :     +- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
            +- RepartitionByExpression [l_orderkey#21L], 200   <===== on-the-fly shuffle with index spec
               +- Project [l_orderkey#21L]                     <===== newly appended data
                  +- Filter ((isnotnull(l_commitdate#32) && isnotnull(l_receiptdate#33)) && (l_commitdate#32 < l_receiptdate#33))
                     +- Relation[l_orderkey#21L,l_partkey#22L,l_suppkey#23L,l_quantity#25,l_extendedprice#26,l_discount#27,l_returnflag#29,l_shipdate#31,l_commitdate#32,l_receiptdate#33,l_shipmode#35] parquet
Delete-only

With the lineage column, we could inject push-down filters for deleted files to the index relation.

Filter(Not(In( <lineage column> , Array(Literal("deleted_file_path1"), Literal("deleted_file_path2"))))

=============================================================
Plan with indexes:
=============================================================
Project [id#0, name#1, _data_file_name#26]
+- Filter ((NOT _data_file_name#26 IN (part-00003-0fc50086-fd6e-4527-ab41-7c48989f928e-c000.snappy.parquet) && isnotnull(id#0)) && (id#0 = 1))
   <----+- FileScan parquet [id#0,name#1,_data_file_name#26] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/eunsong/IdeaProjects/spark2/spark-warehouse/indexes/index2/v__=0], PartitionFilters: [], PushedFilters: [Not(In(_data_file_name, [part-00003-0fc50086-fd6e-4527-ab41-7c48989f928e-c000.snappy.parquet])),..., ReadSchema: struct<id:int,name:string,_data_file_name:string>, SelectedBucketsCount: 1 out of 200---->
Append & Delete

In case there are both appended and deleted files, we cannot apply Case1 of Append-only dataset with delete. Because the newly appended source files does not have the lineage column, we cannot read appended files with 1 FileScan node (schema-mismatch). So in this case, Append-only Case1 also will be handled as Append-only Case2.

Regarding the other cases, as appended files are processed with the other FileScan node, we could use support both append or delete dataset.

Work Items

Future work

Describe alternatives you've considered
Call refreshIndex API

Additional context
Add any other context or screenshots about the feature request here.

Metadata

Metadata

Assignees

Labels

Epicadvanced issueThis is the tag for advanced issues which involve major design changes or introductionenhancementNew feature or requestproposalThis is the default tag for a newly created design proposal

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions