-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23896][SQL]Improve PartitioningAwareFileIndex #21004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
51b385b
5028fe2
603a836
378d0cc
4b5e2db
71d98ed
553a412
2b99b12
00438cd
9a2af2d
114737f
8c8bf69
12ac191
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType} | |
* It provides the necessary methods to parse partition data based on a set of files. | ||
* | ||
* @param parameters as set of options to control partition discovery | ||
* @param userPartitionSchema an optional partition schema that will be use to provide types for | ||
* the discovered partitions | ||
* @param userSpecifiedSchema an optional user specified schema that will be use to provide | ||
* types for the discovered partitions | ||
*/ | ||
abstract class PartitioningAwareFileIndex( | ||
sparkSession: SparkSession, | ||
parameters: Map[String, String], | ||
userPartitionSchema: Option[StructType], | ||
userSpecifiedSchema: Option[StructType], | ||
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { | ||
import PartitioningAwareFileIndex.BASE_PATH_PARAM | ||
|
||
|
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex( | |
val caseInsensitiveOptions = CaseInsensitiveMap(parameters) | ||
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) | ||
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) | ||
|
||
userPartitionSchema match { | ||
val inferredPartitionSpec = PartitioningUtils.parsePartitions( | ||
leafDirs, | ||
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this patch, there was a subtle difference between with and without a user-provided partition schema:
So it was wrong to unify these 2 code paths. @gengliangwang can you change it back? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mgaido91 Thanks for the investigation!! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan |
||
basePaths = basePaths, | ||
timeZoneId = timeZoneId) | ||
userSpecifiedSchema match { | ||
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => | ||
val spec = PartitioningUtils.parsePartitions( | ||
leafDirs, | ||
typeInference = false, | ||
basePaths = basePaths, | ||
timeZoneId = timeZoneId) | ||
val userPartitionSchema = | ||
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec) | ||
|
||
// Without auto inference, all of value in the `row` should be null or in StringType, | ||
// we need to cast into the data type that user specified. | ||
def castPartitionValuesToUserSchema(row: InternalRow) = { | ||
InternalRow((0 until row.numFields).map { i => | ||
val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType | ||
Cast( | ||
Literal.create(row.getUTF8String(i), StringType), | ||
userProvidedSchema.fields(i).dataType, | ||
Literal.create(row.get(i, dt), dt), | ||
userPartitionSchema.fields(i).dataType, | ||
Option(timeZoneId)).eval() | ||
}: _*) | ||
} | ||
|
||
PartitionSpec(userProvidedSchema, spec.partitions.map { part => | ||
PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part => | ||
part.copy(values = castPartitionValuesToUserSchema(part.values)) | ||
}) | ||
case _ => | ||
PartitioningUtils.parsePartitions( | ||
leafDirs, | ||
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, | ||
basePaths = basePaths, | ||
timeZoneId = timeZoneId) | ||
inferredPartitionSpec | ||
} | ||
} | ||
|
||
|
@@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex( | |
val name = path.getName | ||
!((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) | ||
} | ||
|
||
/** | ||
* In the read path, only managed tables by Hive provide the partition columns properly when | ||
* initializing this class. All other file based data sources will try to infer the partitioning, | ||
* and then cast the inferred types to user specified dataTypes if the partition columns exist | ||
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or | ||
* inconsistent data types as reported in SPARK-21463. | ||
* @param spec A partition inference result | ||
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` | ||
*/ | ||
private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = { | ||
val equality = sparkSession.sessionState.conf.resolver | ||
val resolved = spec.partitionColumns.map { partitionField => | ||
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred | ||
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse( | ||
partitionField) | ||
} | ||
StructType(resolved) | ||
} | ||
} | ||
|
||
object PartitioningAwareFileIndex { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be
def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we can merge
checkAndGlobPathIfNecessary
andcreateInMemoryFileIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can't. In some case we need to check the glob files, while we don't need to create
InMemoryFileIndex