Skip to content

[SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 #24798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from

Conversation

mccheah
Copy link
Contributor

@mccheah mccheah commented Jun 4, 2019

What changes were proposed in this pull request?

Implements the REPLACE TABLE and REPLACE TABLE AS SELECT logical plans. REPLACE TABLE is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs.

This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement TransactionalTableCatalog. The semantics of this API are that table creation and replacement can be "staged" and then "committed".

On the execution of REPLACE TABLE AS SELECT, REPLACE TABLE, and CREATE TABLE AS SELECT, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For REPLACE TABLE in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state.

How was this patch tested?

Unit tests - multiple additions to DataSourceV2SQLSuite.

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106170 has finished for PR 24798 at commit 266784e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ReplaceTable(
  • case class ReplaceTableAsSelect(
  • case class ReplaceTableStatement(
  • case class ReplaceTableAsSelectStatement(
  • case class ReplaceTableExec(
  • case class ReplaceTableAsSelectExec(
  • trait StagedTableWriteExec extends V2TableWriteExec

ident, query.schema, partitioning.toArray, properties.asJava)
writeToStagedTable(stagedTable, writeOptions, ident)
case _ =>
// Note that this operation is potentially unsafe, but these are the strict semantics of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we talked about this, and we concluded that this is the appropriate behavior - but I'm still not sure it is wise to support an inherently unsafe and potentially inconsistent operation. It's worth considering if we should throw UnsupportedOperationException here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm still on the fence about this, too.


public interface StagedTable extends Table {

void commitStagedChanges();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately obvious if this API belongs in StagedTable, or if it should be tied to the BatchWrite's commit() operation. The idea I had with tying it to StagedTable is:

  1. Make the atomic swap part more explicit from the perspective of the physical plan execution, and
  2. Allow both StagedTable and Table to share the same WriteBuilder and BatchWrite implementations that persist the rows, and decouple the atomic swap in this module only.

If we wanted to move the swap implementation behind the BatchWrite#commit and BatchWrite#abort APIs, then it's worth asking if we need the StagedTable interface at all - so TransactionalTableCatalog would return plain Table objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this. So the write's commit stashes changes in the staged table, which can finish or roll back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also solves the problem of where to document how to complete the changes staged in a StagedTable. Can you add docs that describe what these methods should do, and for the StagedTable interface?

import org.apache.spark.sql.sources.v2.StagedTable;
import org.apache.spark.sql.types.StructType;

public interface TransactionalTableCatalog {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransactionalTableCatalog is proposed in the SPIP, but we don't really encode any formal notion of transactions in these APIs. Transactionality has a particular connotation in the DBMS nomenclature, e.g. START TRANSACTION statements. Perhaps we can rename this to AtomicTableCatalog or SupportsAtomicOperations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming this is a good idea. How about StagingTableCatalog? The main capability it introduces is staging a table so that it can be used for a write, but doesn't yet exist.

@@ -111,6 +111,14 @@ statement
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other flavors of REPLACE TABLE that we need to support?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we should support all of what's already here, at least not to begin with.

I think that the main use of REPLACE TABLE as an atomic operation is REPLACE TABLE ... AS SELECT. That's because the replacement should only happen if the write succeeds and the write could easily fail for a lot of reasons. Without a write, this is just syntactic sugar for a combined drop and create.

I think the initial PR should focus on just the RTAS case. That simplifies this because it no longer needs the type list. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should support the USING clause that is used to pass the provider name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the tableProvider field at the end I think USING is still supported right? As mentioned elsewhere, this is copied from CTAS.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test for it?

@SparkQA
Copy link

SparkQA commented Jun 5, 2019

Test build #106171 has finished for PR 24798 at commit baeabc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 5, 2019

Test build #106178 has finished for PR 24798 at commit bc8d3b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class InMemoryTable(
  • class BufferedRows extends WriterCommitMessage with InputPartition with Serializable

@mccheah mccheah changed the title [SPARK-27724][WIP] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 [SPARK-27724] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 Jun 5, 2019
@mccheah
Copy link
Contributor Author

mccheah commented Jun 5, 2019

@rdblue @gatorsmile @HyukjinKwon this should be ready to go now, modulo the questions I've posted inline.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-27724] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 [SPARK-27724][SQL] Implement REPLACE TABLE and REPLACE TABLE AS SELECT with V2 Jun 5, 2019
@SparkQA
Copy link

SparkQA commented Jun 5, 2019

Test build #106213 has finished for PR 24798 at commit 6c958b9.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2019

Test build #106216 has finished for PR 24798 at commit 8c0270f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitioning=transformList) |
bucketSpec |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should bucketing be added using BUCKET BY? Or should we rely on bucket as a transform in the PARTITIONED BY clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #24798 (comment) - this is copied from the create table spec.

@@ -111,6 +111,14 @@ statement
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
| replaceTableHeader ('(' colTypeList ')')? tableProvider
((OPTIONS options=tablePropertyList) |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should OPTIONS be supported in v2? Right now, we copy options into table properties because v2 has no separate options. I also think it is confusing to users that there are table properties and options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I copied this entirely from the equivalent create table statement. How does the syntax for REPLACE TABLE differ from that of the existing CREATE TABLE? My understanding is REPLACE TABLE is exactly equivalent to CREATE TABLE with the exception of not having an IF NOT EXISTS option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, it should be the same a CREATE TABLE. That's a good reason to carry this forward.

@@ -258,6 +266,10 @@ createTableHeader
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? multipartIdentifier
;

replaceTableHeader
: REPLACE TEMPORARY? TABLE multipartIdentifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably remove TEMPORARY to begin with. What is the behavior of a temporary table? I think it used to be a view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks fine since this is not allowed in the AST builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of TEMPORARY TABLE. It was a mistake and we've almost removed everything about TEMPORARY TABLE in Spark, only a few parser rules are left for backward compatibility reason.

To clarify, there is no TEMPORARY TABLE in Spark, it never had. Spark only has TABLE, VIEW and TEMP VIEW.

location: Option[String],
comment: Option[String]) extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParsedStatement now defaults these methods, so you can remove them.

s"got ${other.getClass.getName}: $sql")
test("create/replace table using - schema") {
val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
val replaceSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

REPLACE?

@@ -76,25 +79,94 @@ case class CreateTableAsSelectExec(

throw new TableAlreadyExistsException(ident)
}
catalog match {
case txnCatalog: TransactionalTableCatalog =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because so little is shared between the two implementations, I think I would probably separate them into different exec nodes. An added benefit of that is that the physical plan would tell users whether Spark is going to use an atomic operation. That way I could check EXPLAIN and run if it is atomic or do more testing if it is not.

stagedTable.commitStagedChanges()
writtenRows
case _ =>
// table does not support writes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also note that the catch block will abort the staged changes?

@@ -36,6 +36,8 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn

before {
spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(
"spark.sql.catalog.testcatatomic", classOf[TestTransactionalInMemoryCatalog].getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider adding an underscore to make the catalog name more readable.

@@ -170,6 +173,85 @@ class DataSourceV2SQLSuite extends QueryTest with SharedSQLContext with BeforeAn
checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
}

test("ReplaceTableAsSelect: basic v2 implementation using atomic catalog.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: these test cases are dense because they have no new lines. Blank lines between tasks, like creating the original table, replacing it, and assertions, would help readability.

checkAnswer(
spark.internalCreateDataFrame(rdd, replacedTable.schema),
spark.table("source").select("id"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the success cases should be applied to both atomic and non-atomic catalogs because we expect a difference in behavior only in failure cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified some of the success cases, don't know there are more that need to be adjusted. Think we don't have to be completely exhaustive here.

s" AS SELECT id FROM source")
}
val replacedTable = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(replacedTable != table, "Table should have been replaced.")
Copy link
Contributor

@rdblue rdblue Jun 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better test assertion is that the schema matches the new table. This test could be true for the same underlying metadata if two separate instances of a table are loaded from a catalog.

}

test("ReplaceTableAsSelect: Non-atomic catalog creates the empty table, but leaves the" +
" table empty if the write fails.") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't the table dropped in this case? I would expect this to have the behavior of non-atomic CTAS after the initial delete.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behavior here is ambiguous. Suppose then another user went and started writing to the table concurrently - should this job drop the table that the other job is writing to?

Copy link
Contributor

@rdblue rdblue Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intent of RTAS is to run a combined DROP TABLE and CREATE TABLE ... AS SELECT .... CTAS doesn't worry about concurrent writes because the table doesn't "exist" until the write completes. That's why we delete after a CTAS if the write fails, even though it also has a non-atomic case where the table exists and could technically be written to concurrently.

Copy link
Contributor Author

@mccheah mccheah Jun 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm so I took a closer look at this. It turns out that using Utils.tryWithSafeFinallyAndFailureCallbacks is risky in all of these code paths as it is currently implemented.

That method, when it tries to run the catch block, will first try to set the failure reason on the task context via TaskContext.get().markTaskFailed. But since we're running this try...finally block on the driver, there is no such task context to get via TaskContext.get.

What happens in this case then is that this test passes when it should fail, because indeed, the table should be dropped. But the catch block that drops the table never gets run, because TaskContext.get().markTaskFailed NPEs before the catch block can be run.

I think there's a few ways forward:

  1. Don't use the Utils method to do try-catch-finally
  2. Patch the Utils method to check for null on the current task context before trying to mark the task failure reason on it.

I'm going with 2) for now, but 1) is very reasonable as well.

Either way, yeah the table should end up being dropped at the end, so this test also has to be patched.

@brkyvz
Copy link
Contributor

brkyvz commented Jul 16, 2019

Approach and interface LGTM! +9000 on "keep[ing] commits smaller and more focused." in the future. Would really help speed up the development cycle.

@SparkQA
Copy link

SparkQA commented Jul 16, 2019

Test build #107756 has finished for PR 24798 at commit 0b5c029.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2019

Test build #107764 has finished for PR 24798 at commit 581dba2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param partitions transforms to use for partitioning data in the table
* @param properties a string map of table properties
* @return metadata for the new table
* @throws TableAlreadyExistsException If a table or view already exists for the identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here, replace table does require the table exists, right?

*
* A new table will be created using the schema of the query, and rows from the query are appended.
* If the table exists, its contents and schema should be replaced with the schema and the contents
* of the query. This is a non-atomic implementation that drops the table and then runs non-atomic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://github.com/apache/spark/pull/24798/files#r302746896 , this is a broken implementation. RTAS should be able to query any existing tables, including the one that is being replaced. If we do want to have a non-atomic version, how about

  1. create a table with a random but unique name (like UUID), insert data to it
  2. drop the target table
  3. rename the table created in step 1 to the target table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the non-atomic version is allowed to have undefined behavior when failure happens middle way. But it should work as the atomic version if no failure happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That comment applies only to RTAS queries that read the table that will be replaced. We can fix that in a follow-up.

def maybeSimulateFailedTableCreation(tableProperties: util.Map[String, String]): Unit = {
if (tableProperties.containsKey(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
&& tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY)
.equalsIgnoreCase("true")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can just write "true".equalsIgnoreCase(tableProperties.get(TestInMemoryTableCatalog.SIMULATE_FAILED_CREATE_PROPERTY))

if the key doesn't exist, "true".equalsIgnoreCase(null) returns false.


override def commitStagedChanges(): Unit = {
if (replaceIfExists) {
tables.put(ident, delegateTable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: when committing REPLACE TABLE, we should fail if the table is already dropped by others.

@cloud-fan
Copy link
Contributor

Agree with @brkyvz that it's too late to split as this PR has already got many reviews. Please try to keep the PR smaller and more focused next time.

Generally looks good, only a few comments.

extends StagedTable with SupportsWrite with SupportsRead {

override def commitStagedChanges(): Unit = {
if (droppedTables.contains(ident)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to record all the dropped tables in the history. I think a simple version is

if (replaceIfExists) {
  if (!tables.containsKey(ident)) {
      throw new RuntimeException("table already dropped")
  }
  tables.put(ident, delegateTable)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't work because the implementation of stageCreate doesn't actually put the table in the tables map at all. So you can't necessarily say the table was dropped just because the table is not in the tables map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me if this is the correct behavior - if something dropped the table from underneath this, the subsequent commit of the replace or atomic-create operation should have the final say, right?

Copy link
Contributor

@cloud-fan cloud-fan Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about a REPLACE TABLE and DROP TABLE happen at the same time. It doesn't matter which one gets executed first, but the final result must be reachable by one certain execution order.

If REPLACE TABLE executes first, then there should be no table at the end as it's dropped.
If DROP TABLE executes first, then REPLACE TABLE should fail and there is still no table at the end.

Copy link
Contributor

@cloud-fan cloud-fan Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I think my proposal works for REPLACE TABLE right? stageCreate is for CTAS and I think your current code(without tracking dropped tables) already works

Copy link
Contributor Author

@mccheah mccheah Jul 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit more and chatted with @rdblue and I realized the confusion is that the staging catalog API doesn't even support passing through the orCreate flag to the catalog. I think we need to pass this information along to the catalog, otherwise the catalog won't know that the user wanted CREATE OR REPLACE semantics.

I'm more inclined to add an extra method to StagingTableCatalog called stageCreateOrReplace, in addition to the other methods we have here already. Then the behavior of commitStagedChanges depends on whether or not the table was instantiated via stageCreateOrReplace vs. stageReplace vs. stageCreate.

@SparkQA
Copy link

SparkQA commented Jul 18, 2019

Test build #107806 has finished for PR 24798 at commit be04476.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mccheah
Copy link
Contributor Author

mccheah commented Jul 19, 2019

Latest patch adds stageCreateOrReplace to the staging catalog API.

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107871 has finished for PR 24798 at commit 2f6e0b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*
* Expected format:
* {{{
* REPLACE TABLE [IF NOT EXISTS] [db_name.]table_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't match the actual syntax now.

* @param properties a string map of table properties
* @return metadata for the new table
* @throws UnsupportedOperationException If a requested partition transform is not supported
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation should throw TableNotFoundException if the table to replace doesn't exist.

orCreate: Boolean) extends AtomicTableWriteExec {

override protected def doExecute(): RDD[InternalRow] = {
val stagedTable = if (catalog.tableExists(ident)) {
Copy link
Contributor

@cloud-fan cloud-fan Jul 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can simplify this to

      val stagedTable = if (orCreate) {
        catalog.stageCreateOrReplace(
          ident, query.schema, partitioning.toArray, properties.asJava)
      } else {
        catalog.stageReplace(
          ident, query.schema, partitioning.toArray, properties.asJava)
      }

stageReplace should throw exception itself if the table doesn't exist. The implementation already needs to do it before committing, it doesn't hurt to also do it at the beginning.

Copy link
Contributor

@rdblue rdblue Jul 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree that there is no need to check whether the table exists. We had a similar discussion on CREATE TABLE. Spark should check existence to ensure that the error is consistently thrown. If the table does not exist and orCreate is false, then Spark should thrown an exception and not rely on the source to do it.

That said, I think it would be simpler to update the logic a little:

      if (orCreate) {
        catalog.stageCreateOrReplace(
          ident, query.schema, partitioning.toArray, properties.asJava)
      } else if (catalog.tableExists(ident) {
        catalog.stageReplace(
          ident, query.schema, partitioning.toArray, properties.asJava)
      } else {
        throw new CannotReplaceMissingTableException(ident)
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's minor so I don't want to block this PR on it, but Spark is unable to make sure the error is consistently thrown because anything can happen after you check the table existence and before you do the actual operation.

That said, this is just a best-effort, which is not that useful as it's not a guarantee.

@mccheah
Copy link
Contributor Author

mccheah commented Jul 19, 2019

Updated some docs and cleaned up implementations based on comments.

try {
catalog.stageReplace(
identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
} catch {
Copy link
Contributor Author

@mccheah mccheah Jul 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try...catch here is more for flavor and consistency - since @cloud-fan suggested that StagingTableCatalog#stageReplace should be able to throw NoSuchTableException, which could theoretically happen if the table is dropped between the above tableExists call and catalog.stageReplace calls. This ensures that the same type of exception is thrown from the code path for the same kind of illegal state.

@SparkQA
Copy link

SparkQA commented Jul 19, 2019

Test build #107924 has finished for PR 24798 at commit 05a827d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 7ed0088 Jul 22, 2019
yiheng pushed a commit to yiheng/spark that referenced this pull request Jul 24, 2019
…T with V2

## What changes were proposed in this pull request?

Implements the `REPLACE TABLE` and `REPLACE TABLE AS SELECT` logical plans. `REPLACE TABLE` is now a valid operation in spark-sql provided that the tables being modified are managed by V2 catalogs.

This also introduces an atomic mix-in that table catalogs can choose to implement. Table catalogs can now implement `TransactionalTableCatalog`. The semantics of this API are that table creation and replacement can be "staged" and then "committed".

On the execution of `REPLACE TABLE AS SELECT`, `REPLACE TABLE`, and `CREATE TABLE AS SELECT`, if the catalog implements transactional operations, the physical plan will use said functionality. Otherwise, these operations fall back on non-atomic variants. For `REPLACE TABLE` in particular, the usage of non-atomic operations can unfortunately lead to inconsistent state.

## How was this patch tested?

Unit tests - multiple additions to `DataSourceV2SQLSuite`.

Closes apache#24798 from mccheah/spark-27724.

Authored-by: mcheah <mcheah@palantir.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants