Skip to content

Commit 568f133

Browse files
committed
fix1
1 parent 55b1a8c commit 568f133

File tree

7 files changed

+86
-59
lines changed

7 files changed

+86
-59
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,16 @@ case class CatalogTable(
137137
unsupportedFeatures: Seq[String] = Seq.empty) {
138138

139139
// Verify that the provided columns are part of the schema
140-
private val colNames = schema.map(_.name).toSet
141-
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
142-
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
143-
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
144-
}
145-
requireSubsetOfSchema(partitionColumnNames, "partition")
146-
requireSubsetOfSchema(sortColumnNames, "sort")
147-
requireSubsetOfSchema(bucketColumnNames, "bucket")
140+
// TODO: this restriction should be checked at the end of Analyzer. When building CatalogTable,
141+
// the initial version might violate it.
142+
// private val colNames = schema.map(_.name).toSet
143+
// private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
144+
// require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
145+
// s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
146+
// }
147+
// requireSubsetOfSchema(partitionColumnNames, "partition")
148+
// requireSubsetOfSchema(sortColumnNames, "sort")
149+
// requireSubsetOfSchema(bucketColumnNames, "bucket")
148150

149151
/** Columns this table is partitioned by. */
150152
def partitionColumns: Seq[CatalogColumn] =

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.sql.catalyst.TableIdentifier
2525
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
26-
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
26+
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
27+
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
2728
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
2829
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
2930

@@ -366,14 +367,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
366367
throw new AnalysisException(s"Table $tableIdent already exists.")
367368

368369
case _ =>
370+
val bucketSpec = getBucketSpec
371+
val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty)
372+
val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty)
373+
val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1)
374+
375+
val tableDesc = CatalogTable(
376+
identifier = tableIdent,
377+
tableType = CatalogTableType.MANAGED,
378+
storage = CatalogStorageFormat.empty,
379+
schema = Seq.empty[CatalogColumn],
380+
partitionColumnNames = partitioningColumns.getOrElse(Seq.empty[String]),
381+
sortColumnNames = sortColumnNames,
382+
bucketColumnNames = bucketColumnNames,
383+
numBuckets = numBuckets,
384+
properties = extraOptions.toMap)
385+
369386
val cmd =
370387
CreateTableUsingAsSelect(
371-
tableIdent,
388+
tableDesc,
372389
source,
373-
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
374-
getBucketSpec,
375390
mode,
376-
extraOptions.toMap,
377391
df.logicalPlan)
378392
df.sparkSession.sessionState.executePlan(cmd).toRdd
379393
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,23 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
340340
SaveMode.ErrorIfExists
341341
}
342342

343+
val sortColumnNames = bucketSpec.map(_.sortColumnNames).getOrElse(Seq.empty)
344+
val bucketColumnNames = bucketSpec.map(_.bucketColumnNames).getOrElse(Seq.empty)
345+
val numBuckets = bucketSpec.map(_.numBuckets).getOrElse(-1)
346+
347+
val tableDesc = CatalogTable(
348+
identifier = table,
349+
tableType = CatalogTableType.MANAGED,
350+
storage = CatalogStorageFormat.empty,
351+
schema = Seq.empty[CatalogColumn],
352+
partitionColumnNames = partitionColumnNames,
353+
sortColumnNames = sortColumnNames,
354+
bucketColumnNames = bucketColumnNames,
355+
numBuckets = numBuckets,
356+
properties = options)
357+
343358
CreateTableUsingAsSelect(
344-
table, provider, partitionColumnNames, bucketSpec, mode, options, query)
359+
tableDesc = tableDesc, provider = provider, mode = mode, child = query)
345360
} else {
346361
val struct = Option(ctx.colTypeList()).map(createStructType)
347362
CreateTableUsing(
@@ -1025,20 +1040,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
10251040
val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
10261041
if (conf.convertCTAS && !hasStorageProperties) {
10271042
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
1028-
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
1029-
// are empty Maps.
1030-
val optionsWithPath = if (location.isDefined) {
1043+
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties are empty.
1044+
// When converting Hive Table to Data Source Table, ignore user-specified table properties
1045+
val tableProperties = if (location.isDefined) {
10311046
Map("path" -> location.get)
10321047
} else {
10331048
Map.empty[String, String]
10341049
}
10351050
CreateTableUsingAsSelect(
1036-
tableIdent = tableDesc.identifier,
1051+
tableDesc = tableDesc.copy(properties = tableProperties),
10371052
provider = conf.defaultDataSourceName,
1038-
partitionColumns = Array.empty[String],
1039-
bucketSpec = None,
10401053
mode = mode,
1041-
options = optionsWithPath,
10421054
q
10431055
)
10441056
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,12 +447,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
447447
case c: CreateTableUsingAsSelect =>
448448
val cmd =
449449
CreateDataSourceTableAsSelectCommand(
450-
c.tableIdent,
450+
c.tableDesc,
451451
c.provider,
452-
c.partitionColumns,
453-
c.bucketSpec,
454452
c.mode,
455-
c.options,
456453
c.child)
457454
ExecutedCommandExec(cmd) :: Nil
458455

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -130,12 +130,9 @@ case class CreateDataSourceTableCommand(
130130
* }}}
131131
*/
132132
case class CreateDataSourceTableAsSelectCommand(
133-
tableIdent: TableIdentifier,
133+
tableDesc: CatalogTable,
134134
provider: String,
135-
partitionColumns: Array[String],
136-
bucketSpec: Option[BucketSpec],
137135
mode: SaveMode,
138-
options: Map[String, String],
139136
query: LogicalPlan)
140137
extends RunnableCommand {
141138

@@ -146,31 +143,39 @@ case class CreateDataSourceTableAsSelectCommand(
146143
// the table name and database name we have for this query. MetaStoreUtils.validateName
147144
// is the method used by Hive to check if a table name or a database name is valid for
148145
// the metastore.
149-
if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
150-
throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
151-
s"metastore. Metastore only accepts table name containing characters, numbers and _.")
146+
if (!CreateDataSourceTableUtils.validateName(tableDesc.identifier.table)) {
147+
throw new AnalysisException(s"Table name ${tableDesc.identifier.table} is not a valid name " +
148+
s"for metastore. Metastore only accepts table name containing characters, numbers and _.")
152149
}
153-
if (tableIdent.database.isDefined &&
154-
!CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
155-
throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
156-
s"for metastore. Metastore only accepts database name containing " +
150+
if (tableDesc.identifier.database.isDefined &&
151+
!CreateDataSourceTableUtils.validateName(tableDesc.identifier.database.get)) {
152+
throw new AnalysisException(s"Database name ${tableDesc.identifier.database.get} is not " +
153+
s"a valid name for metastore. Metastore only accepts database name containing " +
157154
s"characters, numbers and _.")
158155
}
159156

160-
val tableName = tableIdent.unquotedString
157+
val tableName = tableDesc.identifier.unquotedString
161158
val sessionState = sparkSession.sessionState
162159
var createMetastoreTable = false
163160
var isExternal = true
164161
val optionsWithPath =
165-
if (!new CaseInsensitiveMap(options).contains("path")) {
162+
if (!new CaseInsensitiveMap(tableDesc.properties).contains("path")) {
166163
isExternal = false
167-
options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
164+
tableDesc.properties +
165+
("path" -> sessionState.catalog.defaultTablePath(tableDesc.identifier))
168166
} else {
169-
options
167+
tableDesc.properties
170168
}
171169

170+
val bucketSpec: Option[BucketSpec] = if (tableDesc.numBuckets > 0) {
171+
Option(BucketSpec(
172+
tableDesc.numBuckets, tableDesc.bucketColumnNames, tableDesc.sortColumnNames))
173+
} else {
174+
None
175+
}
176+
172177
var existingSchema = Option.empty[StructType]
173-
if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
178+
if (sparkSession.sessionState.catalog.tableExists(tableDesc.identifier)) {
174179
// Check if we need to throw an exception or just return.
175180
mode match {
176181
case SaveMode.ErrorIfExists =>
@@ -187,21 +192,21 @@ case class CreateDataSourceTableAsSelectCommand(
187192
val dataSource = DataSource(
188193
sparkSession = sparkSession,
189194
userSpecifiedSchema = Some(query.schema.asNullable),
190-
partitionColumns = partitionColumns,
195+
partitionColumns = tableDesc.partitionColumnNames,
191196
bucketSpec = bucketSpec,
192197
className = provider,
193198
options = optionsWithPath)
194199
// TODO: Check that options from the resolved relation match the relation that we are
195200
// inserting into (i.e. using the same compression).
196201

197202
EliminateSubqueryAliases(
198-
sessionState.catalog.lookupRelation(tableIdent)) match {
203+
sessionState.catalog.lookupRelation(tableDesc.identifier)) match {
199204
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
200205
// check if the file formats match
201206
l.relation match {
202207
case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
203208
throw new AnalysisException(
204-
s"The file format of the existing table $tableIdent is " +
209+
s"The file format of the existing table ${tableDesc.identifier} is " +
205210
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
206211
s"format `$provider`")
207212
case _ =>
@@ -238,15 +243,15 @@ case class CreateDataSourceTableAsSelectCommand(
238243
val dataSource = DataSource(
239244
sparkSession,
240245
className = provider,
241-
partitionColumns = partitionColumns,
246+
partitionColumns = tableDesc.partitionColumnNames,
242247
bucketSpec = bucketSpec,
243248
options = optionsWithPath)
244249

245250
val result = try {
246251
dataSource.write(mode, df)
247252
} catch {
248253
case ex: AnalysisException =>
249-
logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
254+
logError(s"Failed to write to table ${tableDesc.identifier} in $mode mode", ex)
250255
throw ex
251256
}
252257
if (createMetastoreTable) {
@@ -255,17 +260,17 @@ case class CreateDataSourceTableAsSelectCommand(
255260
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
256261
CreateDataSourceTableUtils.createDataSourceTable(
257262
sparkSession = sparkSession,
258-
tableIdent = tableIdent,
263+
tableIdent = tableDesc.identifier,
259264
userSpecifiedSchema = Some(result.schema),
260-
partitionColumns = partitionColumns,
265+
partitionColumns = tableDesc.partitionColumnNames.toArray,
261266
bucketSpec = bucketSpec,
262267
provider = provider,
263268
options = optionsWithPath,
264269
isExternal = isExternal)
265270
}
266271

267272
// Refresh the cache of the table in the catalog.
268-
sessionState.catalog.refreshTable(tableIdent)
273+
sessionState.catalog.refreshTable(tableDesc.identifier)
269274
Seq.empty[Row]
270275
}
271276
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.catalyst.TableIdentifier
22+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2223
import org.apache.spark.sql.catalyst.expressions.Attribute
2324
import org.apache.spark.sql.catalyst.plans.logical
2425
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -54,12 +55,9 @@ case class CreateTableUsing(
5455
* So, [[PreWriteCheck]] can detect cases that are not allowed.
5556
*/
5657
case class CreateTableUsingAsSelect(
57-
tableIdent: TableIdentifier,
58+
tableDesc: CatalogTable,
5859
provider: String,
59-
partitionColumns: Array[String],
60-
bucketSpec: Option[BucketSpec],
6160
mode: SaveMode,
62-
options: Map[String, String],
6361
child: LogicalPlan) extends logical.UnaryNode {
6462
override def output: Seq[Attribute] = Seq.empty[Attribute]
6563
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,9 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
209209
case c: CreateTableUsingAsSelect =>
210210
// When the SaveMode is Overwrite, we need to check if the table is an input table of
211211
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
212-
if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) {
212+
if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableDesc.identifier)) {
213213
// Need to remove SubQuery operator.
214-
EliminateSubqueryAliases(catalog.lookupRelation(c.tableIdent)) match {
214+
EliminateSubqueryAliases(catalog.lookupRelation(c.tableDesc.identifier)) match {
215215
// Only do the check if the table is a data source table
216216
// (the relation is a BaseRelation).
217217
case l @ LogicalRelation(dest: BaseRelation, _, _) =>
@@ -221,7 +221,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
221221
}
222222
if (srcRelations.contains(dest)) {
223223
failAnalysis(
224-
s"Cannot overwrite table ${c.tableIdent} that is also being read from.")
224+
s"Cannot overwrite table ${c.tableDesc.identifier} that is also being read from.")
225225
} else {
226226
// OK
227227
}
@@ -233,11 +233,10 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
233233
}
234234

235235
PartitioningUtils.validatePartitionColumn(
236-
c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
236+
c.child.schema, c.tableDesc.partitionColumnNames, conf.caseSensitiveAnalysis)
237237

238238
for {
239-
spec <- c.bucketSpec
240-
sortColumnName <- spec.sortColumnNames
239+
sortColumnName <- c.tableDesc.sortColumnNames
241240
sortColumn <- c.child.schema.find(_.name == sortColumnName)
242241
} {
243242
if (!RowOrdering.isOrderable(sortColumn.dataType)) {

0 commit comments

Comments
 (0)