Derive predicates on Delta Lake generated columns from predicates on source columns #19455
Description
Delta Lake generated columns can be used for table partitions. In Spark SQL, the delta reader detects the relationship and can use it for partition pruning (I think implemented here). It would be great if Trino also supported this type of partition pruning.
For example, if I create a new Delta table
table = (
DeltaTable.createIfNotExists(spark)
.addColumn("a", types.DoubleType())
.addColumn("b", types.DoubleType())
.addColumn("update_ts", types.TimestampType())
.addColumn("p_update_date", types.DateType(), generatedAlwaysAs="cast(update_ts as DATE)")
.partitionedBy("p_update_date")
.location(<path>)
.execute()
)
we can see in the first query below that spark converts the filter update_ts >= current_date
to Filter (((p_update_date#401 >= cast(2023-10-19 00:00:00 as date)) OR isnull((p_update_date#401 >= cast(2023-10-19 00:00:00 as date)))) AND (update_ts#400 >= 2023-10-19 00:00:00))
spark-sql> explain extended select avg(a * b) from <table> where update_ts >= current_date;
== Parsed Logical Plan ==
'Project [unresolvedalias('avg(('a * b)), None)]
+- 'Filter ('update_ts >= 'current_date)
+- 'UnresolvedRelation [<table>], [], false
== Analyzed Logical Plan ==
avg((voltage * current)): double
Aggregate [avg((a#368 * cast(b as double))) AS avg((voltage * current))#405]
+- Filter (update_ts#400 >= cast(current_date(Some(Etc/UTC)) as timestamp))
+- SubqueryAlias spark_catalog.<table>
+- Relation <table>[a, b, ingestion_ts, p_update_date] parquet
== Optimized Logical Plan ==
Aggregate [avg(null) AS avg((a * b))#405]
+- Project
+- Filter (((p_update_date#401 >= cast(2023-10-19 00:00:00 as date)) OR isnull((p_update_date#401 >= cast(2023-10-19 00:00:00 as date)))) AND (update_ts#400 >= 2023-10-19 00:00:00))
+- Relation <table>[a, b, ingestion_ts, p_update_date] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(null)], output=[avg((voltage * current))#405])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#207]
+- HashAggregate(keys=[], functions=[partial_avg(null)], output=[sum#599, count#600L])
+- Project
+- Filter (update_ts#400 >= 2023-10-19 00:00:00)
+- FileScan parquet <table>[update_ts#400,p_update_date#401] Batched: true, DataFilters: [(update_ts#400 >= 2023-10-19 00:00:00)], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[<path>], PartitionFilters: [((p_update_date#401 >= cast(2023-10-19 00:00:00 as date)) OR isnull((p_update_date#401 >= cast(2..., PushedFilters: [GreaterThanOrEqual(update_ts,2023-10-19T00:00:00Z)], ReadSchema: struct<update_ts:timestamp>
spark-sql> explain extended select avg(a * b) from <table> where p_update_date >= current_date;
== Parsed Logical Plan ==
'Project [unresolvedalias('avg(('a * b)), None)]
+- 'Filter ('p_update_date >= 'current_date)
+- 'UnresolvedRelation [<table>], [], false
== Analyzed Logical Plan ==
avg((voltage * current)): double
Aggregate [avg((a#608 * cast(b as double))) AS avg((a * b))#645]
+- Filter (p_update_date#641 >= current_date(Some(Etc/UTC)))
+- SubqueryAlias spark_catalog.<table>
+- Relation <table>[a, b, ingestion_ts, p_update_date] parquet
== Optimized Logical Plan ==
Aggregate [avg(null) AS avg((a * b))#645]
+- Project
+- Filter (isnotnull(p_update_date#641) AND (p_update_date#641 >= 2023-10-19))
+- Relation <table>[a, b, ingestion_ts, p_update_date] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(null)], output=[avg((voltage * current))#645])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#260]
+- HashAggregate(keys=[], functions=[partial_avg(null)], output=[sum#722, count#723L])
+- Project
+- FileScan parquet <table>[p_update_date#641] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[<path>], PartitionFilters: [isnotnull(p_update_date#641), (p_update_date#641 >= 2023-10-19)], PushedFilters: [], ReadSchema: struct<>
Trino either does not recognize the relationship between a generated column and its base column, or does not use that relationship to optimize partition pruning. For the same queries as above, we can see Trino scans the full table (3.7B rows) and takes 3.33s to execute the query when filtering on update_ts
. When filtering on p_update_date
, Trino only scans one partition (6.3M rows) and takes 125ms to execute the query.
trino > explain analyze select count(*) from <table> where update_ts >= current_date
Trino version: 428
Queued: 242.59us, Analysis: 193.05ms, Planning: 38.58ms, Execution: 3.33s
Fragment 1 [SINGLE]
CPU: 106.68ms, Scheduled: 117.55ms, Blocked 46.07s (Input: 43.13s, Output: 0.00ns), Input: 2753 rows (24.20kB); per task: avg.: 2753.00 std.dev.: 0.00, Output: 1 row (9B)
Output layout: [count]
Output partitioning: SINGLE []
Aggregate[type = FINAL]
│ Layout: [count:bigint]
│ Estimates: {rows: 1 (9B), cpu: 13.97G, memory: 9B, network: 0B}
│ CPU: 10.00ms (0.02%), Scheduled: 10.00ms (0.00%), Blocked: 0.00ns (0.00%), Output: 1 row (9B)
│ Input avg.: 2753.00 rows, Input std.dev.: 0.00%
│ count := count("count_0")
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1666757698 (13.97GB), cpu: 0, memory: 0B, network: 0B}
│ CPU: 40.00ms (0.08%), Scheduled: 43.00ms (0.01%), Blocked: 2.94s (6.38%), Output: 2753 rows (24.20kB)
│ Input avg.: 172.06 rows, Input std.dev.: 123.72%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [count_0:bigint]
CPU: 35.00ms (0.07%), Scheduled: 37.00ms (0.01%), Blocked: 43.13s (93.62%), Output: 2753 rows (24.20kB)
Input avg.: 172.06 rows, Input std.dev.: 123.72%
Fragment 2 [SOURCE]
CPU: 47.89s, Scheduled: 4.90m, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 3703905995 rows (31.05GB); per task: avg.: 740781199.00 std.dev.: 81107909.36, Output: 2753 rows (24.20kB)
Output layout: [count_0]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 1666757698 (13.97GB), cpu: ?, memory: ?, network: ?}
│ CPU: 185.00ms (0.39%), Scheduled: 196.00ms (0.07%), Blocked: 0.00ns (0.00%), Output: 2753 rows (24.20kB)
│ Input avg.: 2320.29 rows, Input std.dev.: 3885.13%
│ count_0 := count(*)
└─ ScanFilterProject[table = delta:<table>, filterPredicate = ("update_ts" >= TIMESTAMP '2023-10-18 00:00:00.000 UTC')]
Layout: []
Estimates: {rows: 3703905995 (0B), cpu: 28.29G, memory: 0B, network: 0B}/{rows: 1666757698 (0B), cpu: 28.29G, memory: 0B, network: 0B}/{rows: 1666757698 (0B), cpu: 0, memory: 0B, network: 0B}
CPU: 47.66s (99.44%), Scheduled: 4.89m (99.90%), Blocked: 0.00ns (0.00%), Output: 6387760 rows (0B)
Input avg.: 1345407.19 rows, Input std.dev.: 112.93%
update_ts := update_ts:timestamp(3) with time zone:REGULAR
Input: 3703905995 rows (31.05GB), Filtered: 99.83%, Physical input: 173.50MB, Physical input time: 160740.00ms
trino > explain analyze select count(*) from <table> where p_update_date >= current_date
Trino version: 428
Queued: 225.64us, Analysis: 279.47ms, Planning: 10.65ms, Execution: 124.97ms
Fragment 1 [SINGLE]
CPU: 1.20ms, Scheduled: 1.24ms, Blocked 102.05ms (Input: 95.86ms, Output: 0.00ns), Input: 1 row (9B); per task: avg.: 1.00 std.dev.: 0.00, Output: 1 row (9B)
Output layout: [count]
Output partitioning: SINGLE []
Aggregate[type = FINAL]
│ Layout: [count:bigint]
│ Estimates: {rows: 1 (9B), cpu: 54.83M, memory: 9B, network: 0B}
│ CPU: 0.00ns (?%), Scheduled: 0.00ns (?%), Blocked: 0.00ns (0.00%), Output: 1 row (9B)
│ Input avg.: 1.00 rows, Input std.dev.: 0.00%
│ count := count("count_0")
└─ LocalExchange[partitioning = SINGLE]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 6387760 (54.83MB), cpu: 0, memory: 0B, network: 0B}
│ CPU: 0.00ns (?%), Scheduled: 0.00ns (?%), Blocked: 5.00ms (4.95%), Output: 1 row (9B)
│ Input avg.: 0.06 rows, Input std.dev.: 387.30%
└─ RemoteSource[sourceFragmentIds = [2]]
Layout: [count_0:bigint]
CPU: 0.00ns (?%), Scheduled: 0.00ns (?%), Blocked: 96.00ms (95.05%), Output: 1 row (9B)
Input avg.: 0.06 rows, Input std.dev.: 387.30%
Fragment 2 [SOURCE]
CPU: 308.51us, Scheduled: 309.19us, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 6387760 rows (0B); per task: avg.: 6387760.00 std.dev.: 0.00, Output: 1 row (9B)
Output layout: [count_0]
Output partitioning: SINGLE []
Aggregate[type = PARTIAL]
│ Layout: [count_0:bigint]
│ Estimates: {rows: 6387760 (54.83MB), cpu: ?, memory: ?, network: ?}
│ CPU: 0.00ns (?%), Scheduled: 0.00ns (?%), Blocked: 0.00ns (0.00%), Output: 1 row (9B)
│ Input avg.: 6387760.00 rows, Input std.dev.: 0.00%
│ count_0 := count(*)
└─ TableScan[table = delta:<table>]
Layout: []
Estimates: {rows: 6387760 (0B), cpu: 0, memory: 0B, network: 0B}
CPU: 0.00ns (?%), Scheduled: 0.00ns (?%), Blocked: 0.00ns (0.00%), Output: 6387760 rows (0B)
Input avg.: 6387760.00 rows, Input std.dev.: 0.00%
p_update_date:date:PARTITION_KEY
:: [[2023-10-18, <max>)]
Input: 6387760 rows (0B)
It would be great if Trino could use generated columns to prune partitions during query planning (at least the same subset of relationships that Spark SQL supports), especially since Delta doesn't support bucketing. Our main use cases would be a table partitioned based on a high-cardinality string column and the date partitioning example from above.