Skip to content

[SPARK-23896][SQL]Improve PartitioningAwareFileIndex #21004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class CatalogFileIndex(
sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
} else {
new InMemoryFileIndex(
sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -103,24 +102,6 @@ case class DataSource(
bucket.sortColumnNames, "in the sort definition", equality)
}

/**
* In the read path, only managed tables by Hive provide the partition columns properly when
* initializing this class. All other file based data sources will try to infer the partitioning,
* and then cast the inferred types to user specified dataTypes if the partition columns exist
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
* inconsistent data types as reported in SPARK-21463.
* @param fileIndex A FileIndex that will perform partition inference
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
*/
private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = {
val resolved = fileIndex.partitionSchema.map { partitionField =>
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
partitionField)
}
StructType(resolved)
}

/**
* Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
* it. In the read path, only managed tables by Hive provide the partition columns properly when
Expand All @@ -140,31 +121,26 @@ case class DataSource(
* be any further inference in any triggers.
*
* @param format the file format object for this DataSource
* @param fileStatusCache the shared cache for file statuses to speed up listing
* @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
* columns.
*/
private def getOrInferFileFormatSchema(
format: FileFormat,
fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
// in streaming mode, we have already inferred and registered partition columns, we will
// never have to materialize the lazy val below
lazy val tempFileIndex = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)
lazy val tempFileIndex = fileIndex.getOrElse {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
createInMemoryFileIndex(globbedPaths)
}

val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
tempFileIndex.partitionSchema
} else {
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
// partitioning
Expand Down Expand Up @@ -356,13 +332,7 @@ case class DataSource(
caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)
val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))
} else {
tempFileCatalog
}
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand All @@ -384,24 +354,23 @@ case class DataSource(

// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val globbedPaths = allPaths.flatMap(
DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)

val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {
val globbedPaths =
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
new CatalogFileIndex(
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
new InMemoryFileIndex(
sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, Some(index))
(index, resultDataSchema, resultPartitionSchema)
}

HadoopFsRelation(
Expand Down Expand Up @@ -552,6 +521,40 @@ case class DataSource(
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
}

/** Returns an [[InMemoryFileIndex]] that can be used to get partition schema and file list. */
private def createInMemoryFileIndex(globbedPaths: Seq[Path]): InMemoryFileIndex = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be def createInMemoryFileIndex(checkEmptyGlobPath: Boolean)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we can merge checkAndGlobPathIfNecessary and createInMemoryFileIndex

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can't. In some case we need to check the glob files, while we don't need to create InMemoryFileIndex

val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(
sparkSession, globbedPaths, options, userSpecifiedSchema, fileStatusCache)
}

/**
* Checks and returns files in all the paths.
*/
private def checkAndGlobPathIfNecessary(
checkEmptyGlobPath: Boolean,
checkFilesExist: Boolean): Seq[Path] = {
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val hadoopConf = sparkSession.sessionState.newHadoopConf()
allPaths.flatMap { path =>
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (checkEmptyGlobPath && globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}

// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}.toSeq
}
}

object DataSource extends Logging {
Expand Down Expand Up @@ -699,30 +702,6 @@ object DataSource extends Logging {
locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath)
}

/**
* If `path` is a file pattern, return all the files that match it. Otherwise, return itself.
* If `checkFilesExist` is `true`, also check the file existence.
*/
private def checkAndGlobPathIfNecessary(
hadoopConf: Configuration,
path: String,
checkFilesExist: Boolean): Seq[Path] = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)

if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
// Don't need to check once again if files exist in streaming mode
if (checkFilesExist && !fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
}

/**
* Called before writing into a FileFormat based data source to make sure the
* supplied schema is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration
* @param rootPathsSpecified the list of root table paths to scan (some of which might be
* filtered out later)
* @param parameters as set of options to control discovery
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
class InMemoryFileIndex(
sparkSession: SparkSession,
rootPathsSpecified: Seq[Path],
parameters: Map[String, String],
partitionSchema: Option[StructType],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache)
extends PartitioningAwareFileIndex(
sparkSession, parameters, partitionSchema, fileStatusCache) {
sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {

// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)
// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType}
* It provides the necessary methods to parse partition data based on a set of files.
*
* @param parameters as set of options to control partition discovery
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
abstract class PartitioningAwareFileIndex(
sparkSession: SparkSession,
parameters: Map[String, String],
userPartitionSchema: Option[StructType],
userSpecifiedSchema: Option[StructType],
fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
import PartitioningAwareFileIndex.BASE_PATH_PARAM

Expand Down Expand Up @@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)

userPartitionSchema match {
val inferredPartitionSpec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is causing a behavior change in Spark 2.4.0 reported in SPARK-26188. Why did we need this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this patch, there was a subtle difference between with and without a user-provided partition schema:

  1. with user-provided partition schema, we should not infer data types. We should infer as string and cast to user-provided type
  2. without user-provided partition schema, we should infer the data type(with a config)

So it was wrong to unify these 2 code paths. @gengliangwang can you change it back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Thanks for the investigation!!
I will fix it and add test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the investigation was done by the reported of SPARK-26188, I did nothing... Thanks for doing that @gengliangwang and thanks for your comment @cloud-fan

basePaths = basePaths,
timeZoneId = timeZoneId)
userSpecifiedSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs,
typeInference = false,
basePaths = basePaths,
timeZoneId = timeZoneId)
val userPartitionSchema =
combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)

// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
def castPartitionValuesToUserSchema(row: InternalRow) = {
InternalRow((0 until row.numFields).map { i =>
val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
Cast(
Literal.create(row.getUTF8String(i), StringType),
userProvidedSchema.fields(i).dataType,
Literal.create(row.get(i, dt), dt),
userPartitionSchema.fields(i).dataType,
Option(timeZoneId)).eval()
}: _*)
}

PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
PartitionSpec(userPartitionSchema, inferredPartitionSpec.partitions.map { part =>
part.copy(values = castPartitionValuesToUserSchema(part.values))
})
case _ =>
PartitioningUtils.parsePartitions(
leafDirs,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths,
timeZoneId = timeZoneId)
inferredPartitionSpec
}
}

Expand Down Expand Up @@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}

/**
* In the read path, only managed tables by Hive provide the partition columns properly when
* initializing this class. All other file based data sources will try to infer the partitioning,
* and then cast the inferred types to user specified dataTypes if the partition columns exist
* inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
* inconsistent data types as reported in SPARK-21463.
* @param spec A partition inference result
* @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
*/
private def combineInferredAndUserSpecifiedPartitionSchema(spec: PartitionSpec): StructType = {
val equality = sparkSession.sessionState.conf.resolver
val resolved = spec.partitionColumns.map { partitionField =>
// SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
partitionField)
}
StructType(resolved)
}
}

object PartitioningAwareFileIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType
* A [[FileIndex]] that generates the list of files to processing by reading them from the
* metadata log files generated by the [[FileStreamSink]].
*
* @param userPartitionSchema an optional partition schema that will be use to provide types for
* the discovered partitions
* @param userSpecifiedSchema an optional user specified schema that will be use to provide
* types for the discovered partitions
*/
class MetadataLogFileIndex(
sparkSession: SparkSession,
path: Path,
userPartitionSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) {
userSpecifiedSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {

private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
logInfo(s"Reading streaming file log from $metadataDirectory")
Expand All @@ -51,7 +51,7 @@ class MetadataLogFileIndex(
}

override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
allFilesFromLog.toArray.groupBy(_.getPath.getParent)
allFilesFromLog.groupBy(_.getPath.getParent)
}

override def rootPaths: Seq[Path] = path :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
sparkSession = spark,
rootPathsSpecified = Seq(new Path(tempDir)),
parameters = Map.empty[String, String],
partitionSchema = None)
userSpecifiedSchema = None)
// This should not fail.
fileCatalog.listLeafFiles(Seq(new Path(tempDir)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
HiveCatalogMetrics.reset()
spark.read.load(dir.getAbsolutePath)
assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
}
}
}