-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Implement rewrite position deletes #7389
Conversation
cb101a5
to
4cf9cae
Compare
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/FileRewriteCoordinator.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
Show resolved
Hide resolved
fec93e9
to
0fc63d1
Compare
* @param filesToAdd files that will be added, cannot be null or empty. | ||
* @return this for method chaining | ||
*/ | ||
RewriteFiles rewriteDeleteFiles(Set<DeleteFile> filesToDelete, Set<DeleteFile> filesToAdd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably, there is a problem in RewriteFiles
right now. I think this API would assign new delete files a brand new data sequence number while we should use the max data sequence number of all rewritten position deletes.
On a side note, I am not sure we can ever rewrite equality deletes across sequence numbers. Let me think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking more about, we can't rewrite equality deletes across sequence numbers.
I created #7452 to add validation to RewriteFiles
.
api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
07f5e75
to
4bc6b21
Compare
Failure is more related to #7422 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems close. My biggest question is about usage of StructLike
in maps.
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
6ce8fae
to
9b3709e
Compare
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Outdated
Show resolved
Hide resolved
api/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteFiles.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeleteGroup.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
...park/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackPositionDeletesRewriter.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few comments but nothing blocking. This is a huge change so I'll go ahead and merge it. We can address the last comments in a follow-up PR.
Thanks a lot, @szehon-ho! It has been pending for so long!
import org.slf4j.LoggerFactory; | ||
|
||
/** Spark implementation of {@link RewritePositionDeleteFiles}. */ | ||
public class RewritePositionDeleteSparkAction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be called RewritePositionDeleteFilesSparkAction
? This is public facing and we usually name it as the interface name + SparkAction
.
} | ||
|
||
@VisibleForTesting | ||
RewritePositionDeletesGroup rewriteDeleteFiles( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just making sure that this is indeed used for testing.
filesByPartition.put(coerced, partitionTasks); | ||
} | ||
|
||
StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for future: Can we explore the idea of having 2 helper methods: one for computing files by partition and another file groups by partition. Not in this PR.
} | ||
|
||
@VisibleForTesting | ||
RewritePositionDeletesCommitManager commitManager() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here about visibility.
RewritePositionDeletesCommitManager commitManager) { | ||
ExecutorService rewriteService = rewriteService(); | ||
|
||
// Start Commit Service |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Inconsistent usage of capital letters across 3 comments in this method.
// Start Commit Service
// Start rewrite tasks
// Stop Commit service
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) { | ||
Stream<RewritePositionDeletesGroup> rewriteFileGroupStream = | ||
groupsByPartition.entrySet().stream() | ||
.flatMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for future: Can we try refactoring this using some helper methods cause Spotless formats this in a weird way. Not in this PR.
|
||
RewriteExecutionContext( | ||
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) { | ||
this.numGroupsByPartition = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think we should use StructLikeMap
here too.
StructLike partition = group.get(0).partition(); | ||
|
||
// read the deletes packing them into splits of the required size | ||
Dataset<Row> posDeletes = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: We frequently add DF
suffix for variables referring to Dataset<Row>
.
posDeleteDF
dataFileDF
validPosDeleteDF
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
.../spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeletesSparkAction.java
Outdated
Show resolved
Hide resolved
@szehon-ho: Are you already working on the CALL procedure for the same? If not, I would like to work on it. |
Hi @ajantha-bhat , thanks for asking, i havent started yet but I had planned to work on it probably late this week or next week, after some cleanup of this patch. Will you have something in next few days on that? |
Hi @ajantha-bhat sorry , actually @aokolnychyi pinged me and this should be part of the next release. So I will work on the procedure tomorrow at the first priority, if it is ok |
Ok. |
This change backports #7389 to Spark 3.3.
when i backport to spark 3.1 with error: |
This implements the RewritePositionDeleteFiles Interface (already existing) with a Spark action
This action will compact or split position delete files, based on input parameters. Most of the logic is re-used from RewriteDataFiles, via new Rewriter classes added in #7175 . The additional logic here is sorting position deletes locally by 'file_path' and 'pos', as defined in Iceberg spec.
This action will also notably remove 'dangling deletes', ie remove position deletes that no longer have a live data file. Previously this was not possible in any Iceberg action. This is implemented via a left semi-join on 'data_files' table, before the rewrite.
Remaining items: filter() is not yet supported. As the position deletes rewrite is done against the position_deletes metadata table, the filter of data table does not apply. Some work is needed to transform this.