@@ -104,20 +104,12 @@ case class DataSource(
104
104
* dataTypes in `userSpecifiedSchema`. All subsequent triggers for this stream will re-use
105
105
* this information, therefore calls to this method should be very cheap, i.e. there won't
106
106
* be any further inference in any triggers.
107
- * 4. `df.saveAsTable(tableThatExisted)`: In this case, we call this method to resolve the
108
- * existing table's partitioning scheme. This is achieved by not providing
109
- * `userSpecifiedSchema`. For this case, we add the boolean `justPartitioning` for an early
110
- * exit, if we don't care about the schema of the original table.
111
107
*
112
108
* @param format the file format object for this DataSource
113
- * @param justPartitioning Whether to exit early and provide just the schema partitioning.
114
109
* @return A pair of the data schema (excluding partition columns) and the schema of the partition
115
- * columns. If `justPartitioning` is `true`, then the dataSchema will be provided as
116
- * `null`.
110
+ * columns.
117
111
*/
118
- private def getOrInferFileFormatSchema (
119
- format : FileFormat ,
120
- justPartitioning : Boolean = false ): (StructType , StructType ) = {
112
+ private def getOrInferFileFormatSchema (format : FileFormat ): (StructType , StructType ) = {
121
113
// the operations below are expensive therefore try not to do them if we don't need to, e.g.,
122
114
// in streaming mode, we have already inferred and registered partition columns, we will
123
115
// never have to materialize the lazy val below
@@ -174,9 +166,7 @@ case class DataSource(
174
166
StructType (partitionFields)
175
167
}
176
168
}
177
- if (justPartitioning) {
178
- return (null , partitionSchema)
179
- }
169
+
180
170
val dataSchema = userSpecifiedSchema.map { schema =>
181
171
val equality = sparkSession.sessionState.conf.resolver
182
172
StructType (schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))
@@ -434,26 +424,6 @@ case class DataSource(
434
424
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
435
425
PartitioningUtils .validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
436
426
437
- // If we are appending to a table that already exists, make sure the partitioning matches
438
- // up. If we fail to load the table for whatever reason, ignore the check.
439
- if (mode == SaveMode .Append ) {
440
- val existingPartitionColumns = Try {
441
- getOrInferFileFormatSchema(format, justPartitioning = true )._2.fieldNames.toList
442
- }.getOrElse(Seq .empty[String ])
443
- // TODO: Case sensitivity.
444
- val sameColumns =
445
- existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
446
- if (existingPartitionColumns.nonEmpty && ! sameColumns) {
447
- throw new AnalysisException (
448
- s """ Requested partitioning does not match existing partitioning.
449
- |Existing partitioning columns:
450
- | ${existingPartitionColumns.mkString(" , " )}
451
- |Requested partitioning columns:
452
- | ${partitionColumns.mkString(" , " )}
453
- | """ .stripMargin)
454
- }
455
- }
456
-
457
427
// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
458
428
// not need to have the query as child, to avoid to analyze an optimized query,
459
429
// because InsertIntoHadoopFsRelationCommand will be optimized first.
0 commit comments