Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Implement rewrite position deletes #7389

Merged
merged 7 commits into from
May 4, 2023

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Apr 20, 2023

This implements the RewritePositionDeleteFiles Interface (already existing) with a Spark action

This action will compact or split position delete files, based on input parameters. Most of the logic is re-used from RewriteDataFiles, via new Rewriter classes added in #7175 . The additional logic here is sorting position deletes locally by 'file_path' and 'pos', as defined in Iceberg spec.

This action will also notably remove 'dangling deletes', ie remove position deletes that no longer have a live data file. Previously this was not possible in any Iceberg action. This is implemented via a left semi-join on 'data_files' table, before the rewrite.

Remaining items: filter() is not yet supported. As the position deletes rewrite is done against the position_deletes metadata table, the filter of data table does not apply. Some work is needed to transform this.

@szehon-ho szehon-ho force-pushed the rewrite_position_deletes branch 2 times, most recently from fec93e9 to 0fc63d1 Compare April 21, 2023 07:36
* @param filesToAdd files that will be added, cannot be null or empty.
* @return this for method chaining
*/
RewriteFiles rewriteDeleteFiles(Set<DeleteFile> filesToDelete, Set<DeleteFile> filesToAdd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, there is a problem in RewriteFiles right now. I think this API would assign new delete files a brand new data sequence number while we should use the max data sequence number of all rewritten position deletes.

On a side note, I am not sure we can ever rewrite equality deletes across sequence numbers. Let me think.

Copy link
Contributor

@aokolnychyi aokolnychyi Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more about, we can't rewrite equality deletes across sequence numbers.
I created #7452 to add validation to RewriteFiles.

@szehon-ho
Copy link
Collaborator Author

Failure is more related to #7422

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems close. My biggest question is about usage of StructLike in maps.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few comments but nothing blocking. This is a huge change so I'll go ahead and merge it. We can address the last comments in a follow-up PR.

Thanks a lot, @szehon-ho! It has been pending for so long!

import org.slf4j.LoggerFactory;

/** Spark implementation of {@link RewritePositionDeleteFiles}. */
public class RewritePositionDeleteSparkAction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be called RewritePositionDeleteFilesSparkAction? This is public facing and we usually name it as the interface name + SparkAction.

}

@VisibleForTesting
RewritePositionDeletesGroup rewriteDeleteFiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure that this is indeed used for testing.

filesByPartition.put(coerced, partitionTasks);
}

StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future: Can we explore the idea of having 2 helper methods: one for computing files by partition and another file groups by partition. Not in this PR.

}

@VisibleForTesting
RewritePositionDeletesCommitManager commitManager() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about visibility.

RewritePositionDeletesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();

// Start Commit Service
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Inconsistent usage of capital letters across 3 comments in this method.

// Start Commit Service
// Start rewrite tasks
// Stop Commit service

Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
Stream<RewritePositionDeletesGroup> rewriteFileGroupStream =
groupsByPartition.entrySet().stream()
.flatMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future: Can we try refactoring this using some helper methods cause Spotless formats this in a weird way. Not in this PR.


RewriteExecutionContext(
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
this.numGroupsByPartition =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I think we should use StructLikeMap here too.

StructLike partition = group.get(0).partition();

// read the deletes packing them into splits of the required size
Dataset<Row> posDeletes =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: We frequently add DF suffix for variables referring to Dataset<Row>.

posDeleteDF
dataFileDF
validPosDeleteDF

@aokolnychyi aokolnychyi merged commit 667fd86 into apache:master May 4, 2023
@ajantha-bhat
Copy link
Member

@szehon-ho: Are you already working on the CALL procedure for the same? If not, I would like to work on it.

@szehon-ho
Copy link
Collaborator Author

Hi @ajantha-bhat , thanks for asking, i havent started yet but I had planned to work on it probably late this week or next week, after some cleanup of this patch. Will you have something in next few days on that?

@szehon-ho
Copy link
Collaborator Author

szehon-ho commented May 9, 2023

Hi @ajantha-bhat sorry , actually @aokolnychyi pinged me and this should be part of the next release. So I will work on the procedure tomorrow at the first priority, if it is ok

manisin pushed a commit to Snowflake-Labs/iceberg that referenced this pull request May 9, 2023
@ajantha-bhat
Copy link
Member

Hi @ajantha-bhat sorry , actually @aokolnychyi pinged me and this should be part of the next release. So I will work on the procedure tomorrow at the first priority, if it is ok

Ok.

@chenwyi2
Copy link

when i backport to spark 3.1 with error:
java.lang.IllegalArgumentException: Cannot parse path or identifier: 9a439584-c00f-45a5-9df7-31adfe182900
at org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:722)
at org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:713)
at org.apache.iceberg.spark.source.IcebergSource.catalogAndIdentifier(IcebergSource.java:141)
at org.apache.iceberg.spark.source.IcebergSource.extractIdentifier(IcebergSource.java:167)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:288)
i think the problem is
Dataset<Row> posDeletes = spark .read() .format("iceberg") .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupId) .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) .option(SparkReadOptions.FILE_OPEN_COST, "0") .load(groupId);
it seems like spark 3.1 can not read table based on groupId?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants