Skip to content

Conversation

szehon-ho
Copy link
Member

What changes were proposed in this pull request?

Add new interfaces HasPartitionKeys and KeyedPartitioning to DSV2 to report partition values. These are a superset of HasPartitionKey and KeyGroupedPartitioning (which requires the data source to group its InputPartition by partition-values and is mainly for SPJ). Use this in Spark for further partition-column filtering.

Why are the changes needed?

Currently, Spark converts Catalyst Expression to either Filter or Predicate and pushes it to DSV2 via SupportsPushdownFilters and SupportsPushdownV2Filters API's.

However, some Spark filters may not convert cleanly. For example, trim(part_col) = 'a'. There are cases where DSV2 can return the exact partition value(s) to spark for its InputPartition, and Spark can use the original catalyst expression for filtering.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test

Was this patch authored or co-authored using generative AI tooling?

No

val filterableScan = scan.asInstanceOf[SupportsRuntimeV2Filtering]
filterableScan.filter(dataSourceFilters.toArray)
// Apply additional filtering based on partition keys if available
if (allFilters.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like when you create BatchScanExec you pass in allFilters, which contains both runtimeFilters and postScanFilters, but we already have runtimeFilters.
Would it make sense to just pass in postScanFilters and compute allFilters here?

Can allFilters be computed as runtimeFilters that can't be translated to V2 + postScanFilters? Or we can't be sure that filterableScan.filter() applies all translated runtimeFilters?

}

d.copy(keyGroupedPartitioning = catalystPartitioning)
val catalystKeyedPartitioning = scan.outputPartitioning() match {
Copy link
Contributor

@peter-toth peter-toth Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since scan.outputPartitioning() is either KeyGroupedPartitioning or KeyedPartitioning (or something that we don't care about), can we merge the catalystPartitioning and catalystKeyedPartitioning computing matches into one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants