Skip to content

Commit 12ac191

Browse files
committed
address comments
1 parent 8c8bf69 commit 12ac191

File tree

1 file changed

+13
-15
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution/datasources

1 file changed

+13
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323
import scala.language.{existentials, implicitConversions}
2424
import scala.util.{Failure, Success, Try}
2525

26-
import org.apache.hadoop.conf.Configuration
2726
import org.apache.hadoop.fs.Path
2827

2928
import org.apache.spark.deploy.SparkHadoopUtil
@@ -95,14 +94,6 @@ case class DataSource(
9594
lazy val sourceInfo: SourceInfo = sourceSchema()
9695
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
9796
private val equality = sparkSession.sessionState.conf.resolver
98-
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
99-
// in streaming mode, we have already inferred and registered partition columns, we will
100-
// never have to materialize the lazy val below
101-
private lazy val tempFileIndex = {
102-
val globbedPaths =
103-
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
104-
createInMemoryFileIndex(globbedPaths)
105-
}
10697

10798
bucketSpec.map { bucket =>
10899
SchemaUtils.checkColumnNameDuplication(
@@ -137,22 +128,29 @@ case class DataSource(
137128
private def getOrInferFileFormatSchema(
138129
format: FileFormat,
139130
fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = {
140-
def inMemoryFileIndex = fileIndex.getOrElse(tempFileIndex)
131+
// The operations below are expensive therefore try not to do them if we don't need to, e.g.,
132+
// in streaming mode, we have already inferred and registered partition columns, we will
133+
// never have to materialize the lazy val below
134+
lazy val tempFileIndex = fileIndex.getOrElse {
135+
val globbedPaths =
136+
checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false)
137+
createInMemoryFileIndex(globbedPaths)
138+
}
141139

142140
val partitionSchema = if (partitionColumns.isEmpty) {
143141
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
144142
// columns properly unless it is a Hive DataSource
145-
inMemoryFileIndex.partitionSchema
143+
tempFileIndex.partitionSchema
146144
} else {
147145
// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
148146
// partitioning
149147
if (userSpecifiedSchema.isEmpty) {
150-
val inferredPartitions = inMemoryFileIndex.partitionSchema
148+
val inferredPartitions = tempFileIndex.partitionSchema
151149
inferredPartitions
152150
} else {
153151
val partitionFields = partitionColumns.map { partitionColumn =>
154152
userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {
155-
val inferredPartitions = inMemoryFileIndex.partitionSchema
153+
val inferredPartitions = tempFileIndex.partitionSchema
156154
val inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))
157155
if (inferredOpt.isDefined) {
158156
logDebug(
@@ -181,7 +179,7 @@ case class DataSource(
181179
format.inferSchema(
182180
sparkSession,
183181
caseInsensitiveOptions,
184-
inMemoryFileIndex.allFiles())
182+
tempFileIndex.allFiles())
185183
}.getOrElse {
186184
throw new AnalysisException(
187185
s"Unable to infer schema for $format. It must be specified manually.")
@@ -360,7 +358,7 @@ case class DataSource(
360358
checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
361359
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
362360
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
363-
catalogTable.get.partitionSchema.nonEmpty
361+
catalogTable.get.partitionColumnNames.nonEmpty
364362
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
365363
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
366364
val index = new CatalogFileIndex(

0 commit comments

Comments
 (0)