-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18087] [SQL] Optimize insert to not require REPAIR TABLE #15633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #67543 has finished for PR 15633 at commit
|
Test build #67567 has finished for PR 15633 at commit
|
Test build #67595 has finished for PR 15633 at commit
|
Test build #67601 has finished for PR 15633 at commit
|
@@ -386,13 +390,18 @@ object WriteOutput extends Logging { | |||
logDebug(s"Writing partition: $currentKey") | |||
|
|||
currentWriter = newOutputWriter(currentKey, getPartitionString) | |||
val partitionStr = getPartitionString(currentKey).getString(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionStr => partitionPath ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -375,6 +378,7 @@ object WriteOutput extends Logging { | |||
|
|||
// If anything below fails, we should abort the task. | |||
var currentKey: UnsafeRow = null | |||
var updatedPartitions: List[String] = Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of bucketing, There are multiple files (writer) per partition, so partitionPath will have duplicated value, should we use Set[String] here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Test build #67671 has finished for PR 15633 at commit
|
Test build #67682 has finished for PR 15633 at commit
|
should we also fix |
I think that one is ok since we have to scan the full table anyways. If it becomes a performance issue we can also add this optimization. |
…nd repair partition commands ## What changes were proposed in this pull request? The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633 ## How was this patch tested? Existing tests. cc yhuai cloud-fan Author: Eric Liang <ekhliang@gmail.com> Author: Eric Liang <ekl@databricks.com> Closes apache#15665 from ericl/spark-18146.
@@ -179,24 +180,30 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { | |||
"Cannot overwrite a path that is also being read from.") | |||
} | |||
|
|||
def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = { | |||
if (l.catalogTable.isDefined && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move this if
out of the function? e.g.
val refreshPartitionsCallback = if (...) {
...
} else {
_ => ()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo that is a little harder to read, since you have two anonymous function declarations instead of one.
if (l.catalogTable.isDefined && | ||
l.catalogTable.get.partitionColumnNames.nonEmpty && | ||
l.catalogTable.get.partitionProviderIsHive) { | ||
val metastoreUpdater = AlterTableAddPartitionCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we just copy the main logic of AlterTableAddPartitionCommand
here? or we have to fetch the table metadata from metastore everytime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather keep it, since the fetch overhead is pretty small
LGTM, pending jenkins |
[SPARK-18087] [SQL] Optimize insert to not require REPAIR TABLE
Test build #3381 has finished for PR 15633 at commit
|
Merging in master. |
…nd repair partition commands ## What changes were proposed in this pull request? The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633 ## How was this patch tested? Existing tests. cc yhuai cloud-fan Author: Eric Liang <ekhliang@gmail.com> Author: Eric Liang <ekl@databricks.com> Closes apache#15665 from ericl/spark-18146.
I took a look at this again tonight, in the context of consolidating this with Hive. I think doing it through callback is actually not ideal, as callbacks are harder to trace. In 2.2 we should make this an explicit action, rather than callbacks. |
…nd repair partition commands ## What changes were proposed in this pull request? The behavior of union is not well defined here. It is safer to explicitly execute these commands in order. The other use of `Union` in this way will be removed by apache#15633 ## How was this patch tested? Existing tests. cc yhuai cloud-fan Author: Eric Liang <ekhliang@gmail.com> Author: Eric Liang <ekl@databricks.com> Closes apache#15665 from ericl/spark-18146.
## What changes were proposed in this pull request? When inserting into datasource tables with partitions managed by the hive metastore, we need to notify the metastore of newly added partitions. Previously this was implemented via `msck repair table`, but this is more expensive than needed. This optimizes the insertion path to add only the updated partitions. ## How was this patch tested? Existing tests (I verified manually that tests fail if the repair operation is omitted). Author: Eric Liang <ekl@databricks.com> Closes apache#15633 from ericl/spark-18087.
What changes were proposed in this pull request?
When inserting into datasource tables with partitions managed by the hive metastore, we need to notify the metastore of newly added partitions. Previously this was implemented via
msck repair table
, but this is more expensive than needed.This optimizes the insertion path to add only the updated partitions.
How was this patch tested?
Existing tests (I verified manually that tests fail if the repair operation is omitted).