Skip to content

[SPARK-23896][SQL]Improve PartitioningAwareFileIndex #21004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Currently PartitioningAwareFileIndex accepts an optional parameter userPartitionSchema. If provided, it will combine the inferred partition schema with the parameter.

However,

  1. to get userPartitionSchema, we need to combine inferred partition schema with userSpecifiedSchema
  2. to get the inferred partition schema, we have to create a temporary file index.

Only after that, a final version of PartitioningAwareFileIndex can be created.

This can be improved by passing userSpecifiedSchema to PartitioningAwareFileIndex.

With the improvement, we can reduce redundant code and avoid parsing the file partition twice.

How was this patch tested?

Unit test

@SparkQA
Copy link

SparkQA commented Apr 8, 2018

Test build #89034 has finished for PR 21004 at commit 35aff24.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 9, 2018

Test build #89044 has finished for PR 21004 at commit 10536a6.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 9, 2018

Test build #89049 has finished for PR 21004 at commit 10536a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

@cloud-fan @gatorsmile

// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
val expr = inferredPartitionSpec.partitionColumns.fields(i).dataType match {
case StringType => Literal.create(row.getUTF8String(i), StringType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why special case string type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row.get(i, StringType) throws exception

@@ -81,7 +81,7 @@ class PartitionProviderCompatibilitySuite
HiveCatalogMetrics.reset()
assert(spark.sql("select * from test where partCol < 2").count() == 2)
assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 7)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the files should be parsed once for creating file index. So it is 5 + 2

@SparkQA
Copy link

SparkQA commented Apr 11, 2018

Test build #89215 has finished for PR 21004 at commit d12efab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89224 has finished for PR 21004 at commit d12efab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89233 has finished for PR 21004 at commit 43f6b77.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@cloud-fan
Copy link
Contributor

retest this please

val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val (dataSchema, partitionSchema) =
getOrInferFileFormatSchema(format)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now it can be merged to the above line


val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we may glob the path twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Originally it glob twice too. I don't have a good solution to avoid this.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89244 has finished for PR 21004 at commit 43f6b77.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89245 has finished for PR 21004 at commit 630fb8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
}
optionalFileIndex: Option[FileIndex] = None): (StructType, StructType) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existingFileIndex

val index = fileIndex match {
case i: InMemoryFileIndex => i
case _ => tempFileIndex
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89259 has finished for PR 21004 at commit 91946a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89272 has finished for PR 21004 at commit d871ea8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89273 has finished for PR 21004 at commit e9b6e90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89277 has finished for PR 21004 at commit 60d5b6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 12, 2018

Test build #89288 has finished for PR 21004 at commit 60d5b6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
private lazy val tempFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only used once, no need to be a lazy val, we can just inline it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it here on purpose. So it may be avoid being created twice in the future.
I am OK to inline it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just inline it. People can still create a new index in the future, technically this can't prevent users from doing that.

checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionSchema.nonEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use partitionColumnNames over partitionSchema, since partitionColumnNames is a val and partitionSchema is def

@@ -552,6 +523,40 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we can merge checkAndGlobPathIfNecessary and createInMemoryFileIndex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't. In some case we need to check the glob files, while we don't need to create InMemoryFileIndex

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89306 has finished for PR 21004 at commit 60d5b6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang gengliangwang force-pushed the PartitioningAwareFileIndex branch from 60d5b6b to 12ac191 Compare April 13, 2018 05:27
@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89315 has finished for PR 21004 at commit 12ac191.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gengliangwang
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 13, 2018

Test build #89319 has finished for PR 21004 at commit 12ac191.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 4dfd746 Apr 13, 2018
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Apr 14, 2018

(let's avoid to describe the PR title just saying improvement next time)

userPartitionSchema match {
val inferredPartitionSpec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this patch, there was a subtle difference between with and without a user-provided partition schema:

  1. with user-provided partition schema, we should not infer data types. We should infer as string and cast to user-provided type
  2. without user-provided partition schema, we should infer the data type(with a config)

So it was wrong to unify these 2 code paths. @gengliangwang can you change it back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Thanks for the investigation!!
I will fix it and add test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants