-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-23896][SQL]Improve PartitioningAwareFileIndex #21004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23896][SQL]Improve PartitioningAwareFileIndex #21004
Conversation
Test build #89034 has finished for PR 21004 at commit
|
Test build #89044 has finished for PR 21004 at commit
|
retest this please. |
Test build #89049 has finished for PR 21004 at commit
|
// we need to cast into the data type that user specified. | ||
def castPartitionValuesToUserSchema(row: InternalRow) = { | ||
InternalRow((0 until row.numFields).map { i => | ||
val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match { | ||
case StringType => Literal.create(row.getUTF8String(i), StringType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why special case string type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row.get(i, StringType)
throws exception
@@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite | |||
HiveCatalogMetrics.reset() | |||
assert(spark.sql("select * from test where partCol < 2").count() == 2) | |||
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) | |||
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) | |||
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happened here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the files should be parsed once for creating file index. So it is 5 + 2
Test build #89215 has finished for PR 21004 at commit
|
retest this please. |
Test build #89224 has finished for PR 21004 at commit
|
Test build #89233 has finished for PR 21004 at commit
|
retest this please. |
retest this please |
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) | ||
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) | ||
val (dataSchema, partitionSchema) = | ||
getOrInferFileFormatSchema(format) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now it can be merged to the above line
|
||
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) | ||
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) | ||
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now we may glob the path twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Originally it glob twice too. I don't have a good solution to avoid this.
Test build #89244 has finished for PR 21004 at commit
|
Test build #89245 has finished for PR 21004 at commit
|
}.toArray | ||
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) | ||
} | ||
optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
existingFileIndex
val index = fileIndex match { | ||
case i: InMemoryFileIndex => i | ||
case _ => tempFileIndex | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
Test build #89259 has finished for PR 21004 at commit
|
Test build #89272 has finished for PR 21004 at commit
|
Test build #89273 has finished for PR 21004 at commit
|
retest this please. |
Test build #89277 has finished for PR 21004 at commit
|
Test build #89288 has finished for PR 21004 at commit
|
retest this please. |
// The operations below are expensive therefore try not to do them if we don't need to, e.g., | ||
// in streaming mode, we have already inferred and registered partition columns, we will | ||
// never have to materialize the lazy val below | ||
private lazy val tempFileIndex = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's only used once, no need to be a lazy val, we can just inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved it here on purpose. So it may be avoid being created twice in the future.
I am OK to inline it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's just inline it. People can still create a new index in the future, technically this can't prevent users from doing that.
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) | ||
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && | ||
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && | ||
catalogTable.get.partitionSchema.nonEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use partitionColumnNames
over partitionSchema
, since partitionColumnNames
is a val and partitionSchema
is def
@@ -552,6 +523,40 @@ case class DataSource( | |||
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.") | |||
} | |||
} | |||
|
|||
/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */ | |||
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we can merge checkAndGlobPathIfNecessary
and createInMemoryFileIndex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can't. In some case we need to check the glob files, while we don't need to create InMemoryFileIndex
Test build #89306 has finished for PR 21004 at commit
|
60d5b6b
to
12ac191
Compare
Test build #89315 has finished for PR 21004 at commit
|
retest this please. |
Test build #89319 has finished for PR 21004 at commit
|
thanks, merging to master! |
(let's avoid to describe the PR title just saying improvement next time) |
userPartitionSchema match { | ||
val inferredPartitionSpec = PartitioningUtils.parsePartitions( | ||
leafDirs, | ||
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this patch, there was a subtle difference between with and without a user-provided partition schema:
- with user-provided partition schema, we should not infer data types. We should infer as string and cast to user-provided type
- without user-provided partition schema, we should infer the data type(with a config)
So it was wrong to unify these 2 code paths. @gengliangwang can you change it back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 Thanks for the investigation!!
I will fix it and add test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan
What changes were proposed in this pull request?
Currently
PartitioningAwareFileIndex
accepts an optional parameteruserPartitionSchema
. If provided, it will combine the inferred partition schema with the parameter.However,
userPartitionSchema
, we need to combine inferred partition schema withuserSpecifiedSchema
Only after that, a final version of
PartitioningAwareFileIndex
can be created.This can be improved by passing
userSpecifiedSchema
toPartitioningAwareFileIndex
.With the improvement, we can reduce redundant code and avoid parsing the file partition twice.
How was this patch tested?
Unit test