Skip to content

[SPARK-18087] [SQL] Optimize insert to not require REPAIR TABLE #15633

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Oct 26, 2016

What changes were proposed in this pull request?

When inserting into datasource tables with partitions managed by the hive metastore, we need to notify the metastore of newly added partitions. Previously this was implemented via msck repair table, but this is more expensive than needed.

This optimizes the insertion path to add only the updated partitions.

How was this patch tested?

Existing tests (I verified manually that tests fail if the repair operation is omitted).

@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67543 has finished for PR 15633 at commit fa91e39.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67567 has finished for PR 15633 at commit a361583.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67595 has finished for PR 15633 at commit 01e73bb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 26, 2016

Test build #67601 has finished for PR 15633 at commit 6f8a3a0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl ericl changed the title [SPARK-18087] [SQL] [WIP] Optimize insert to not require REPAIR TABLE [SPARK-18087] [SQL] Optimize insert to not require REPAIR TABLE Oct 27, 2016
@ericl
Copy link
Contributor Author

ericl commented Oct 27, 2016

cc @cloud-fan @davies

@@ -386,13 +390,18 @@ object WriteOutput extends Logging {
logDebug(s"Writing partition: $currentKey")

currentWriter = newOutputWriter(currentKey, getPartitionString)
val partitionStr = getPartitionString(currentKey).getString(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionStr => partitionPath ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -375,6 +378,7 @@ object WriteOutput extends Logging {

// If anything below fails, we should abort the task.
var currentKey: UnsafeRow = null
var updatedPartitions: List[String] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of bucketing, There are multiple files (writer) per partition, so partitionPath will have duplicated value, should we use Set[String] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Oct 27, 2016

Test build #67671 has finished for PR 15633 at commit 8c4ae5e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 28, 2016

Test build #67682 has finished for PR 15633 at commit 4d96725.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

should we also fix DataFrameWriter.saveAsTable?

@ericl
Copy link
Contributor Author

ericl commented Oct 28, 2016

I think that one is ok since we have to scan the full table anyways. If it becomes a performance issue we can also add this optimization.

ghost pushed a commit to dbtsai/spark that referenced this pull request Oct 30, 2016
…nd repair partition commands

## What changes were proposed in this pull request?

The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633

## How was this patch tested?

Existing tests.

cc yhuai cloud-fan

Author: Eric Liang <ekhliang@gmail.com>
Author: Eric Liang <ekl@databricks.com>

Closes apache#15665 from ericl/spark-18146.
@@ -179,24 +180,30 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
"Cannot overwrite a path that is also being read from.")
}

def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
if (l.catalogTable.isDefined &&
Copy link
Contributor

@cloud-fan cloud-fan Oct 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move this if out of the function? e.g.

val refreshPartitionsCallback = if (...) {
  ...
} else {
  _ => ()
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo that is a little harder to read, since you have two anonymous function declarations instead of one.

if (l.catalogTable.isDefined &&
l.catalogTable.get.partitionColumnNames.nonEmpty &&
l.catalogTable.get.partitionProviderIsHive) {
val metastoreUpdater = AlterTableAddPartitionCommand(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just copy the main logic of AlterTableAddPartitionCommand here? or we have to fetch the table metadata from metastore everytime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather keep it, since the fetch overhead is pretty small

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins

rxin added a commit to rxin/spark that referenced this pull request Nov 1, 2016
[SPARK-18087] [SQL] Optimize insert to not require REPAIR TABLE
@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #3381 has finished for PR 15633 at commit 4d96725.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2016

Merging in master.

@asfgit asfgit closed this in efc254a Nov 1, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…nd repair partition commands

## What changes were proposed in this pull request?

The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633

## How was this patch tested?

Existing tests.

cc yhuai cloud-fan

Author: Eric Liang <ekhliang@gmail.com>
Author: Eric Liang <ekl@databricks.com>

Closes apache#15665 from ericl/spark-18146.
@rxin
Copy link
Contributor

rxin commented Nov 3, 2016

I took a look at this again tonight, in the context of consolidating this with Hive. I think doing it through callback is actually not ideal, as callbacks are harder to trace. In 2.2 we should make this an explicit action, rather than callbacks.

uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…nd repair partition commands

## What changes were proposed in this pull request?

The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633

## How was this patch tested?

Existing tests.

cc yhuai cloud-fan

Author: Eric Liang <ekhliang@gmail.com>
Author: Eric Liang <ekl@databricks.com>

Closes apache#15665 from ericl/spark-18146.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

When inserting into datasource tables with partitions managed by the hive metastore, we need to notify the metastore of newly added partitions. Previously this was implemented via `msck repair table`, but this is more expensive than needed.

This optimizes the insertion path to add only the updated partitions.
## How was this patch tested?

Existing tests (I verified manually that tests fail if the repair operation is omitted).

Author: Eric Liang <ekl@databricks.com>

Closes apache#15633 from ericl/spark-18087.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants