-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-30428][SQL] File source V2: support partition pruning #27112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
11287f6
acfe4f1
51a42a0
65200b6
13e9535
31c8c14
7652c35
58a4a07
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,10 +36,15 @@ import org.apache.commons.io.FileUtils | |
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.sql._ | ||
import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT} | ||
import org.apache.spark.sql.execution.datasources.DataSource | ||
import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
import org.apache.spark.sql.catalyst.plans.logical.Filter | ||
import org.apache.spark.sql.execution.SparkPlan | ||
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition} | ||
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.test.SharedSparkSession | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.v2.avro.AvroScan | ||
import org.apache.spark.util.Utils | ||
|
||
abstract class AvroSuite extends QueryTest with SharedSparkSession { | ||
|
@@ -1502,8 +1507,75 @@ class AvroV1Suite extends AvroSuite { | |
} | ||
|
||
class AvroV2Suite extends AvroSuite { | ||
import testImplicits._ | ||
|
||
override protected def sparkConf: SparkConf = | ||
super | ||
.sparkConf | ||
.set(SQLConf.USE_V1_SOURCE_LIST, "") | ||
|
||
test("Avro source v2: support partition pruning") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not related to this PR, but we should think of how to share test cases between the avro suite and |
||
withTempPath { dir => | ||
Seq(("a", 1, 2), ("b", 1, 2), ("c", 2, 1)) | ||
.toDF("value", "p1", "p2") | ||
.write | ||
.format("avro") | ||
.partitionBy("p1", "p2") | ||
.option("header", true) | ||
.save(dir.getCanonicalPath) | ||
val df = spark | ||
.read | ||
.format("avro") | ||
.option("header", true) | ||
.load(dir.getCanonicalPath) | ||
.where("p1 = 1 and p2 = 2 and value != \"a\"") | ||
|
||
val filterCondition = df.queryExecution.optimizedPlan.collectFirst { | ||
case f: Filter => f.condition | ||
} | ||
assert(filterCondition.isDefined) | ||
// The partitions filters should be pushed down and no need to be reevaluated. | ||
assert(filterCondition.get.collectFirst { | ||
case a: AttributeReference if a.name == "p1" || a.name == "p2" => a | ||
}.isEmpty) | ||
|
||
val fileScan = df.queryExecution.executedPlan collectFirst { | ||
case BatchScanExec(_, f: AvroScan) => f | ||
} | ||
assert(fileScan.nonEmpty) | ||
assert(fileScan.get.partitionFilters.nonEmpty) | ||
assert(fileScan.get.planInputPartitions().forall { partition => | ||
partition.asInstanceOf[FilePartition].files.forall { file => | ||
file.filePath.contains("p1=1") && file.filePath.contains("p2=2") | ||
} | ||
}) | ||
checkAnswer(df, Row("b", 1, 2)) | ||
} | ||
} | ||
|
||
private def getBatchScanExec(plan: SparkPlan): BatchScanExec = { | ||
plan.find(_.isInstanceOf[BatchScanExec]).get.asInstanceOf[BatchScanExec] | ||
} | ||
|
||
test("Avro source v2: same result with different orders of data filters and partition filters") { | ||
withTempPath { path => | ||
val tmpDir = path.getCanonicalPath | ||
spark | ||
.range(10) | ||
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d") | ||
.write | ||
.partitionBy("a", "b") | ||
.format("avro") | ||
.save(tmpDir) | ||
val df = spark.read.format("avro").load(tmpDir) | ||
// partition filters: a > 1 AND b < 9 | ||
// data filters: c > 1 AND d < 9 | ||
val plan1 = df.where("a > 1 AND b < 9 AND c > 1 AND d < 9").queryExecution.sparkPlan | ||
val plan2 = df.where("b < 9 AND a > 1 AND d < 9 AND c > 1").queryExecution.sparkPlan | ||
assert(plan1.sameResult(plan2)) | ||
val scan1 = getBatchScanExec(plan1) | ||
val scan2 = getBatchScanExec(plan2) | ||
assert(scan1.sameResult(scan2)) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,46 @@ | |
|
||
package org.apache.spark.sql.execution.datasources | ||
|
||
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.planning.PhysicalOperation | ||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable} | ||
import org.apache.spark.sql.types.StructType | ||
|
||
private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { | ||
|
||
private def getPartitionKeyFilters( | ||
sparkSession: SparkSession, | ||
relation: LeafNode, | ||
partitionSchema: StructType, | ||
filters: Seq[Expression], | ||
output: Seq[AttributeReference]): ExpressionSet = { | ||
val normalizedFilters = DataSourceStrategy.normalizeExprs( | ||
filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f)), output) | ||
val partitionColumns = | ||
relation.resolve(partitionSchema, sparkSession.sessionState.analyzer.resolver) | ||
val partitionSet = AttributeSet(partitionColumns) | ||
ExpressionSet(normalizedFilters.filter { f => | ||
f.references.subsetOf(partitionSet) | ||
}) | ||
} | ||
|
||
private def rebuildPhysicalOperation( | ||
projects: Seq[NamedExpression], | ||
filters: Seq[Expression], | ||
relation: LeafNode): Project = { | ||
val withFilter = if (filters.nonEmpty) { | ||
val filterExpression = filters.reduceLeft(And) | ||
Filter(filterExpression, relation) | ||
} else { | ||
relation | ||
} | ||
Project(projects, withFilter) | ||
} | ||
|
||
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { | ||
case op @ PhysicalOperation(projects, filters, | ||
logicalRelation @ | ||
|
@@ -39,31 +72,35 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { | |
_, | ||
_)) | ||
if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => | ||
val normalizedFilters = DataSourceStrategy.normalizeExprs( | ||
filters.filterNot(SubqueryExpression.hasSubquery), logicalRelation.output) | ||
|
||
val sparkSession = fsRelation.sparkSession | ||
val partitionColumns = | ||
logicalRelation.resolve( | ||
partitionSchema, sparkSession.sessionState.analyzer.resolver) | ||
val partitionSet = AttributeSet(partitionColumns) | ||
val partitionKeyFilters = ExpressionSet(normalizedFilters.filter { f => | ||
f.references.subsetOf(partitionSet) | ||
}) | ||
|
||
val partitionKeyFilters = getPartitionKeyFilters( | ||
fsRelation.sparkSession, logicalRelation, partitionSchema, filters, logicalRelation.output) | ||
if (partitionKeyFilters.nonEmpty) { | ||
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq) | ||
val prunedFsRelation = | ||
fsRelation.copy(location = prunedFileIndex)(sparkSession) | ||
fsRelation.copy(location = prunedFileIndex)(fsRelation.sparkSession) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to pass also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guykhazma Thanks for the suggestion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang this is useful for enabling data skipping on all file formats including formats which doesn't support pushdown (e.g CSV, JSON) by replacing the FileIndex implementation with a FileIndex which use also the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the old v1 code path, let's not touch it in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and for v2 code path, the data filters are already pushed in the rule |
||
// Change table stats based on the sizeInBytes of pruned files | ||
val withStats = logicalRelation.catalogTable.map(_.copy( | ||
stats = Some(CatalogStatistics(sizeInBytes = BigInt(prunedFileIndex.sizeInBytes))))) | ||
val prunedLogicalRelation = logicalRelation.copy( | ||
relation = prunedFsRelation, catalogTable = withStats) | ||
// Keep partition-pruning predicates so that they are visible in physical planning | ||
val filterExpression = filters.reduceLeft(And) | ||
val filter = Filter(filterExpression, prunedLogicalRelation) | ||
Project(projects, filter) | ||
rebuildPhysicalOperation(projects, filters, prunedLogicalRelation) | ||
} else { | ||
op | ||
} | ||
|
||
case op @ PhysicalOperation(projects, filters, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CSV datasource in #26973 doesn't fall to the case but parquet/orc does. And |
||
v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output)) | ||
if filters.nonEmpty && scan.readDataSchema.nonEmpty => | ||
val partitionKeyFilters = getPartitionKeyFilters(scan.sparkSession, | ||
v2Relation, scan.readPartitionSchema, filters, output) | ||
if (partitionKeyFilters.nonEmpty) { | ||
val prunedV2Relation = | ||
v2Relation.copy(scan = scan.withPartitionFilters(partitionKeyFilters.toSeq)) | ||
// The pushed down partition filters don't need to be reevaluated. | ||
val afterScanFilters = | ||
ExpressionSet(filters) -- partitionKeyFilters.filter(_.references.nonEmpty) | ||
rebuildPhysicalOperation(projects, afterScanFilters.toSeq, prunedV2Relation) | ||
} else { | ||
op | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path | |
import org.apache.spark.internal.Logging | ||
import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD | ||
import org.apache.spark.sql.{AnalysisException, SparkSession} | ||
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet} | ||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection | ||
import org.apache.spark.sql.connector.read.{Batch, InputPartition, Scan, Statistics, SupportsReportStatistics} | ||
import org.apache.spark.sql.execution.PartitionedFileUtil | ||
|
@@ -32,20 +33,38 @@ import org.apache.spark.sql.sources.Filter | |
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.util.Utils | ||
|
||
abstract class FileScan( | ||
sparkSession: SparkSession, | ||
fileIndex: PartitioningAwareFileIndex, | ||
readDataSchema: StructType, | ||
readPartitionSchema: StructType) | ||
extends Scan | ||
with Batch with SupportsReportStatistics with Logging { | ||
trait FileScan extends Scan with Batch with SupportsReportStatistics with Logging { | ||
/** | ||
* Returns whether a file with `path` could be split or not. | ||
*/ | ||
def isSplitable(path: Path): Boolean = { | ||
false | ||
} | ||
|
||
def sparkSession: SparkSession | ||
|
||
def fileIndex: PartitioningAwareFileIndex | ||
|
||
/** | ||
* Returns the required data schema | ||
*/ | ||
def readDataSchema: StructType | ||
|
||
/** | ||
* Returns the required partition schema | ||
*/ | ||
def readPartitionSchema: StructType | ||
|
||
/** | ||
* Returns the filters that can be use for partition pruning | ||
*/ | ||
def partitionFilters: Seq[Expression] | ||
|
||
/** | ||
* Create a new `FileScan` instance from the current one with different `partitionFilters`. | ||
*/ | ||
def withPartitionFilters(partitionFilters: Seq[Expression]): FileScan | ||
|
||
/** | ||
* If a file with `path` is unsplittable, return the unsplittable reason, | ||
* otherwise return `None`. | ||
|
@@ -55,11 +74,24 @@ abstract class FileScan( | |
"undefined" | ||
} | ||
|
||
protected def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]") | ||
|
||
override def equals(obj: Any): Boolean = obj match { | ||
case f: FileScan => | ||
fileIndex == f.fileIndex && readSchema == f.readSchema | ||
ExpressionSet(partitionFilters) == ExpressionSet(f.partitionFilters) | ||
|
||
case _ => false | ||
} | ||
|
||
override def hashCode(): Int = getClass.hashCode() | ||
|
||
override def description(): String = { | ||
val locationDesc = | ||
fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") | ||
val metadata: Map[String, String] = Map( | ||
"ReadSchema" -> readDataSchema.catalogString, | ||
"PartitionFilters" -> seqToString(partitionFilters), | ||
"Location" -> locationDesc) | ||
val metadataStr = metadata.toSeq.sorted.map { | ||
case (key, value) => | ||
|
@@ -71,7 +103,7 @@ abstract class FileScan( | |
} | ||
|
||
protected def partitions: Seq[FilePartition] = { | ||
val selectedPartitions = fileIndex.listFiles(Seq.empty, Seq.empty) | ||
val selectedPartitions = fileIndex.listFiles(partitionFilters, Seq.empty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang @cloud-fan continuing the discussion from above (the comment was on the wrong line). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sense to me. @gengliangwang what do you think? At least you can disable v2 file source to bring back this feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it makes sense if there is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gengliangwang @cloud-fan sure, thanks. |
||
val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions) | ||
val partitionAttributes = fileIndex.partitionSchema.toAttributes | ||
val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the indent is wrong in
AvroScan
. Fix it as well.