Skip to content

[SPARK-41959][SQL] Improve v1 writes with empty2null #39475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,8 @@ object FileFormatWriter extends Logging {
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean): Set[String] = {
val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
plan
} else {
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
}
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan

writeAndCommit(job, description, committer) {
val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
}

private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
val empty2NullPlan = if (hasEmpty2Null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check has no meaning now since we have already checked WriteFiles for rule idempotency.

query
} else {
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
if (projectList.isEmpty) query else Project(projectList, query)
}
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
val empty2NullPlan = if (projectList.isEmpty) query else Project(projectList, query)
assert(empty2NullPlan.output.length == query.output.length)
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))

Expand All @@ -108,7 +103,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
case a: Attribute => attrMap.getOrElse(a, a)
}.asInstanceOf[SortOrder])
val outputOrdering = query.outputOrdering
// Check if the ordering is already matched to ensure the idempotency of the rule.
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
if (orderingMatched) {
empty2NullPlan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we don't add the sort here, the caller will rewrite the attributes in v1 command which will update required orderng expression as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's the root reason in FileFormatWriter why the previous isOrderingMatched does not work when planned write enabled. The attributes in required ordering have been rewritten but the acutal ordering have not.

Expand Down