Skip to content

Commit 05a827d

Browse files
committed
Address comments
1 parent 2f6e0b6 commit 05a827d

File tree

5 files changed

+39
-36
lines changed

5 files changed

+39
-36
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/StagingTableCatalog.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.spark.sql.catalog.v2.expressions.Transform;
2323
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
24+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
2425
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
2526
import org.apache.spark.sql.sources.v2.StagedTable;
2627
import org.apache.spark.sql.sources.v2.SupportsWrite;
@@ -87,10 +88,11 @@ StagedTable stageCreate(
8788
* can decide whether to move forward with the table replacement anyways or abort the commit
8889
* operation.
8990
* <p>
90-
* If the table does not exist, committing the staged changes should fail. This differs from the
91-
* semantics of {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which
92-
* should create the table in the data source if the table does not exist at the time of
93-
* committing the operation.
91+
* If the table does not exist, committing the staged changes should fail with
92+
* {@link NoSuchTableException}. This differs from the semantics of
93+
* {@link #stageCreateOrReplace(Identifier, StructType, Transform[], Map)}, which should create
94+
* the table in the data source if the table does not exist at the time of committing the
95+
* operation.
9496
*
9597
* @param ident a table identifier
9698
* @param schema the schema of the new table, as a struct type
@@ -99,12 +101,13 @@ StagedTable stageCreate(
99101
* @return metadata for the new table
100102
* @throws UnsupportedOperationException If a requested partition transform is not supported
101103
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
104+
* @throws NoSuchTableException If the table does not exist
102105
*/
103106
StagedTable stageReplace(
104107
Identifier ident,
105108
StructType schema,
106109
Transform[] partitions,
107-
Map<String, String> properties) throws NoSuchNamespaceException;
110+
Map<String, String> properties) throws NoSuchNamespaceException, NoSuchTableException;
108111

109112
/**
110113
* Stage the creation or replacement of a table, preparing it to be committed into the metastore

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CannotReplaceMissingTableException.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import org.apache.spark.sql.AnalysisException
2222
import org.apache.spark.sql.catalog.v2.Identifier
2323

2424
class CannotReplaceMissingTableException(
25-
tableIdentifier: Identifier)
25+
tableIdentifier: Identifier,
26+
cause: Option[Throwable] = None)
2627
extends AnalysisException(
2728
s"Table $tableIdentifier cannot be replaced as it did not exist." +
28-
s" Use CREATE OR REPLACE TABLE to create the table.")
29+
s" Use CREATE OR REPLACE TABLE to create the table.", cause = cause)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2308,7 +2308,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
23082308
*
23092309
* Expected format:
23102310
* {{{
2311-
* REPLACE TABLE [IF NOT EXISTS] [db_name.]table_name
2311+
* [CREATE OR] REPLACE TABLE [db_name.]table_name
23122312
* USING table_provider
23132313
* replace_table_clauses
23142314
* [[AS] select_statement];

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
2323
import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
2424
import org.apache.spark.sql.catalog.v2.expressions.Transform
2525
import org.apache.spark.sql.catalyst.InternalRow
26-
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
26+
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
2727
import org.apache.spark.sql.catalyst.expressions.Attribute
2828
import org.apache.spark.sql.execution.LeafExecNode
2929
import org.apache.spark.sql.sources.v2.StagedTable
@@ -32,19 +32,19 @@ import org.apache.spark.util.Utils
3232

3333
case class ReplaceTableExec(
3434
catalog: TableCatalog,
35-
identifier: Identifier,
35+
ident: Identifier,
3636
tableSchema: StructType,
3737
partitioning: Seq[Transform],
3838
tableProperties: Map[String, String],
3939
orCreate: Boolean) extends LeafExecNode {
4040

4141
override protected def doExecute(): RDD[InternalRow] = {
42-
if (catalog.tableExists(identifier)) {
43-
catalog.dropTable(identifier)
42+
if (catalog.tableExists(ident)) {
43+
catalog.dropTable(ident)
4444
} else if (!orCreate) {
45-
throw new CannotReplaceMissingTableException(identifier)
45+
throw new CannotReplaceMissingTableException(ident)
4646
}
47-
catalog.createTable(identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
47+
catalog.createTable(ident, tableSchema, partitioning.toArray, tableProperties.asJava)
4848
sqlContext.sparkContext.parallelize(Seq.empty, 1)
4949
}
5050

@@ -60,18 +60,17 @@ case class AtomicReplaceTableExec(
6060
orCreate: Boolean) extends LeafExecNode {
6161

6262
override protected def doExecute(): RDD[InternalRow] = {
63-
val staged = if (catalog.tableExists(identifier)) {
64-
catalog.stageReplace(
65-
identifier,
66-
tableSchema,
67-
partitioning.toArray,
68-
tableProperties.asJava)
69-
} else if (orCreate) {
70-
catalog.stageCreate(
71-
identifier,
72-
tableSchema,
73-
partitioning.toArray,
74-
tableProperties.asJava)
63+
val staged = if (orCreate) {
64+
catalog.stageCreateOrReplace(
65+
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
66+
} else if (catalog.tableExists(identifier)) {
67+
try {
68+
catalog.stageReplace(
69+
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
70+
} catch {
71+
case e: NoSuchTableException =>
72+
throw new CannotReplaceMissingTableException(identifier, Some(e))
73+
}
7574
} else {
7675
throw new CannotReplaceMissingTableException(identifier)
7776
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.catalog.v2.{Identifier, StagingTableCatalog, TableCatalog}
3030
import org.apache.spark.sql.catalog.v2.expressions.Transform
3131
import org.apache.spark.sql.catalyst.InternalRow
32-
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException}
32+
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
3333
import org.apache.spark.sql.catalyst.expressions.Attribute
3434
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3535
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
@@ -213,21 +213,21 @@ case class AtomicReplaceTableAsSelectExec(
213213
orCreate: Boolean) extends AtomicTableWriteExec {
214214

215215
override protected def doExecute(): RDD[InternalRow] = {
216-
val stagedTable = if (catalog.tableExists(ident)) {
217-
if (orCreate) {
218-
catalog.stageCreateOrReplace(
219-
ident, query.schema, partitioning.toArray, properties.asJava)
220-
} else {
216+
val staged = if (orCreate) {
217+
catalog.stageCreateOrReplace(
218+
ident, query.schema, partitioning.toArray, properties.asJava)
219+
} else if (catalog.tableExists(ident)) {
220+
try {
221221
catalog.stageReplace(
222222
ident, query.schema, partitioning.toArray, properties.asJava)
223+
} catch {
224+
case e: NoSuchTableException =>
225+
throw new CannotReplaceMissingTableException(ident, Some(e))
223226
}
224-
} else if (orCreate) {
225-
catalog.stageCreateOrReplace(
226-
ident, query.schema, partitioning.toArray, properties.asJava)
227227
} else {
228228
throw new CannotReplaceMissingTableException(ident)
229229
}
230-
writeToStagedTable(stagedTable, writeOptions, ident)
230+
writeToStagedTable(staged, writeOptions, ident)
231231
}
232232
}
233233

0 commit comments

Comments
 (0)