Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Implement rewrite position deletes #7389

Merged
merged 7 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Review comments
  • Loading branch information
szehon-ho committed May 4, 2023
commit 1c27cb26006693f015e88fec2913bf5987d89e33
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public interface RewritePositionDeleteFiles
interface Result {
List<FileGroupRewriteResult> rewriteResults();

/** Returns the count of the position delete files that been rewritten. */
/** Returns the count of the position delete files that have been rewritten. */
default int rewrittenDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;

/** A strategy for an action to rewrite position delete files. */
/**
* A strategy for an action to rewrite position delete files.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; Use {@link SizeBasedFileRewriter} instead
*/
@Deprecated
public interface RewritePositionDeleteStrategy {

/** Returns the name of this rewrite deletes strategy */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package org.apache.iceberg.actions;

import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,14 +48,14 @@ public RewritePositionDeletesCommitManager(Table table) {

/**
* Perform a commit operation on the table adding and removing files as required for this set of
* file groups
* file groups.
*
* @param fileGroups file sets to commit
* @param fileGroups file groups to commit
*/
public void commitFileGroups(Set<RewritePositionDeleteGroup> fileGroups) {
public void commit(Set<RewritePositionDeletesGroup> fileGroups) {
Set<DeleteFile> rewrittenDeleteFiles = Sets.newHashSet();
Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
for (RewritePositionDeleteGroup group : fileGroups) {
for (RewritePositionDeletesGroup group : fileGroups) {
rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles());
addedDeleteFiles.addAll(group.addedDeleteFiles());
}
Expand All @@ -73,20 +73,18 @@ public void commitFileGroups(Set<RewritePositionDeleteGroup> fileGroups) {
*
* @param fileGroup group of files which has already been rewritten
*/
public void abortFileGroup(RewritePositionDeleteGroup fileGroup) {
public void abort(RewritePositionDeletesGroup fileGroup) {
Preconditions.checkState(
fileGroup.addedDeleteFiles() != null, "Cannot abort a fileGroup that was not rewritten");

Set<String> filePaths =
fileGroup.addedDeleteFiles().stream()
.map(f -> f.path().toString())
.collect(Collectors.toSet());
Iterable<String> filePaths =
Iterables.transform(fileGroup.addedDeleteFiles(), f -> f.path().toString());
CatalogUtil.deleteFiles(table.io(), filePaths, "position delete", true);
}

public void commitOrClean(Set<RewritePositionDeleteGroup> rewriteGroups) {
public void commitOrClean(Set<RewritePositionDeletesGroup> rewriteGroups) {
try {
commitFileGroups(rewriteGroups);
commit(rewriteGroups);
} catch (CommitStateUnknownException e) {
LOG.error(
"Commit state unknown for {}, cannot clean up files because they may have been committed successfully.",
Expand All @@ -95,7 +93,7 @@ public void commitOrClean(Set<RewritePositionDeleteGroup> rewriteGroups) {
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abortFileGroup);
rewriteGroups.forEach(this::abort);
throw e;
}
}
Expand All @@ -112,20 +110,20 @@ public CommitService service(int rewritesPerCommit) {
return new CommitService(rewritesPerCommit);
}

public class CommitService extends BaseCommitService<RewritePositionDeleteGroup> {
public class CommitService extends BaseCommitService<RewritePositionDeletesGroup> {

CommitService(int rewritesPerCommit) {
super(table, rewritesPerCommit);
}

@Override
protected void commitOrClean(Set<RewritePositionDeleteGroup> batch) {
protected void commitOrClean(Set<RewritePositionDeletesGroup> batch) {
RewritePositionDeletesCommitManager.this.commitOrClean(batch);
}

@Override
protected void abortFileGroup(RewritePositionDeleteGroup group) {
RewritePositionDeletesCommitManager.this.abortFileGroup(group);
protected void abortFileGroup(RewritePositionDeletesGroup group) {
RewritePositionDeletesCommitManager.this.abort(group);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@
* Container class representing a set of position delete files to be rewritten by a {@link
* RewritePositionDeleteFiles} and the new files which have been written by the action.
*/
public class RewritePositionDeleteGroup {
public class RewritePositionDeletesGroup {
private final FileGroupInfo info;
private final List<PositionDeletesScanTask> tasks;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();

public RewritePositionDeleteGroup(
FileGroupInfo info, List<PositionDeletesScanTask> fileScanTasks) {
public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
this.info = info;
this.tasks = fileScanTasks;
this.tasks = tasks;
}

public FileGroupInfo info() {
Expand Down Expand Up @@ -107,18 +106,18 @@ public int numRewrittenDeleteFiles() {
return tasks.size();
}

public static Comparator<RewritePositionDeleteGroup> comparator(RewriteJobOrder order) {
public static Comparator<RewritePositionDeletesGroup> comparator(RewriteJobOrder order) {
switch (order) {
case BYTES_ASC:
return Comparator.comparing(RewritePositionDeleteGroup::rewrittenBytes);
return Comparator.comparing(RewritePositionDeletesGroup::rewrittenBytes);
case BYTES_DESC:
return Comparator.comparing(
RewritePositionDeleteGroup::rewrittenBytes, Comparator.reverseOrder());
RewritePositionDeletesGroup::rewrittenBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewritePositionDeleteGroup::numRewrittenDeleteFiles);
return Comparator.comparing(RewritePositionDeletesGroup::numRewrittenDeleteFiles);
case FILES_DESC:
return Comparator.comparing(
RewritePositionDeleteGroup::numRewrittenDeleteFiles, Comparator.reverseOrder());
RewritePositionDeletesGroup::numRewrittenDeleteFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

class SparkBinPackDataRewriter extends SparkSizeBasedDataRewriter {

private static final long SPLIT_OVERHEAD = 5 * 1024;

SparkBinPackDataRewriter(SparkSession spark, Table table) {
super(spark, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteFiles;
import org.apache.iceberg.actions.RewritePositionDeleteGroup;
import org.apache.iceberg.actions.RewritePositionDeletesCommitManager;
import org.apache.iceberg.actions.RewritePositionDeletesCommitManager.CommitService;
import org.apache.iceberg.actions.RewritePositionDeletesGroup;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -68,19 +68,19 @@
import org.slf4j.LoggerFactory;

/** Spark implementation of {@link RewritePositionDeleteFiles}. */
public class RewritePositionDeletesSparkAction
extends BaseSnapshotUpdateSparkAction<RewritePositionDeletesSparkAction>
public class RewritePositionDeleteSparkAction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be called RewritePositionDeleteFilesSparkAction? This is public facing and we usually name it as the interface name + SparkAction.

extends BaseSnapshotUpdateSparkAction<RewritePositionDeleteSparkAction>
implements RewritePositionDeleteFiles {

private static final Logger LOG =
LoggerFactory.getLogger(RewritePositionDeletesSparkAction.class);
private static final Logger LOG = LoggerFactory.getLogger(RewritePositionDeleteSparkAction.class);
private static final Set<String> VALID_OPTIONS =
ImmutableSet.of(
MAX_CONCURRENT_FILE_GROUP_REWRITES,
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
REWRITE_JOB_ORDER);
private static final Result EMPTY = ImmutableRewritePositionDeleteFiles.Result.builder().build();
private static final Result EMPTY_RESULT =
ImmutableRewritePositionDeleteFiles.Result.builder().build();

private final Table table;
private final SparkBinPackPositionDeletesRewriter rewriter;
Expand All @@ -90,27 +90,27 @@ public class RewritePositionDeletesSparkAction
private boolean partialProgressEnabled;
private RewriteJobOrder rewriteJobOrder;

RewritePositionDeletesSparkAction(SparkSession spark, Table table) {
RewritePositionDeleteSparkAction(SparkSession spark, Table table) {
super(spark);
this.table = table;
this.rewriter = new SparkBinPackPositionDeletesRewriter(spark(), table);
}

@Override
protected RewritePositionDeletesSparkAction self() {
protected RewritePositionDeleteSparkAction self() {
return this;
}

@Override
public RewritePositionDeletesSparkAction filter(Expression expression) {
public RewritePositionDeleteSparkAction filter(Expression expression) {
throw new UnsupportedOperationException("Regular filters not supported yet.");
}

@Override
public RewritePositionDeleteFiles.Result execute() {
if (table.currentSnapshot() == null) {
LOG.info("Nothing found to rewrite in empty table {}", table.name());
return EMPTY;
return EMPTY_RESULT;
}

validateAndInitOptions();
Expand All @@ -120,10 +120,10 @@ public RewritePositionDeleteFiles.Result execute() {

if (ctx.totalGroupCount() == 0) {
LOG.info("Nothing found to rewrite in {}", table.name());
return EMPTY;
return EMPTY_RESULT;
}

Stream<RewritePositionDeleteGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);
Stream<RewritePositionDeletesGroup> groupStream = toGroupStream(ctx, fileGroupsByPartition);

RewritePositionDeletesCommitManager commitManager = commitManager();
if (partialProgressEnabled) {
Expand All @@ -147,9 +147,7 @@ private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
StructLikeMap.create(partitionType);

for (PositionDeletesScanTask task : scanTasks) {
StructLike taskPartition = task.file().partition();
StructLike coerced =
PartitionUtil.coercePartition(partitionType, task.spec(), taskPartition);
StructLike coerced = coercePartition(task, partitionType);

List<PositionDeletesScanTask> partitionTasks = filesByPartition.get(coerced);
if (partitionTasks == null) {
Expand All @@ -168,7 +166,6 @@ private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
rewriter.planFileGroups(partitionTasks);
List<List<PositionDeletesScanTask>> groups = ImmutableList.copyOf(plannedFileGroups);
if (groups.size() > 0) {
// use coerced partition for map key uniqueness, but return original partition
fileGroupsByPartition.put(partition, groups);
}
});
Expand All @@ -184,8 +181,8 @@ private Map<StructLike, List<List<PositionDeletesScanTask>>> planFileGroups() {
}

@VisibleForTesting
RewritePositionDeleteGroup rewriteDeleteFiles(
RewriteExecutionContext ctx, RewritePositionDeleteGroup fileGroup) {
RewritePositionDeletesGroup rewriteDeleteFiles(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure that this is indeed used for testing.

RewriteExecutionContext ctx, RewritePositionDeletesGroup fileGroup) {
String desc = jobDesc(fileGroup, ctx);
Set<DeleteFile> addedFiles =
withJobGroupInfo(
Expand Down Expand Up @@ -214,14 +211,14 @@ RewritePositionDeletesCommitManager commitManager() {

private Result doExecute(
RewriteExecutionContext ctx,
Stream<RewritePositionDeleteGroup> groupStream,
Stream<RewritePositionDeletesGroup> groupStream,
RewritePositionDeletesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();

ConcurrentLinkedQueue<RewritePositionDeleteGroup> rewrittenGroups =
ConcurrentLinkedQueue<RewritePositionDeletesGroup> rewrittenGroups =
Queues.newConcurrentLinkedQueue();

Tasks.Builder<RewritePositionDeleteGroup> rewriteTaskBuilder =
Tasks.Builder<RewritePositionDeletesGroup> rewriteTaskBuilder =
Tasks.foreach(groupStream)
.executeWith(rewriteService)
.stopOnFailure()
Expand All @@ -248,9 +245,7 @@ private Result doExecute(
rewrittenGroups.size(),
e);

Tasks.foreach(rewrittenGroups)
.suppressFailureWhenFinished()
.run(commitManager::abortFileGroup);
Tasks.foreach(rewrittenGroups).suppressFailureWhenFinished().run(commitManager::abort);
throw e;
} finally {
rewriteService.shutdown();
Expand All @@ -273,7 +268,7 @@ private Result doExecute(

List<FileGroupRewriteResult> rewriteResults =
rewrittenGroups.stream()
.map(RewritePositionDeleteGroup::asResult)
.map(RewritePositionDeletesGroup::asResult)
.collect(Collectors.toList());

return ImmutableRewritePositionDeleteFiles.Result.builder()
Expand All @@ -283,7 +278,7 @@ private Result doExecute(

private Result doExecuteWithPartialProgress(
RewriteExecutionContext ctx,
Stream<RewritePositionDeleteGroup> groupStream,
Stream<RewritePositionDeletesGroup> groupStream,
RewritePositionDeletesCommitManager commitManager) {
ExecutorService rewriteService = rewriteService();

Expand All @@ -305,7 +300,7 @@ private Result doExecuteWithPartialProgress(

// Stop Commit service
commitService.close();
List<RewritePositionDeleteGroup> commitResults = commitService.results();
List<RewritePositionDeletesGroup> commitResults = commitService.results();
if (commitResults.size() == 0) {
LOG.error(
"{} is true but no rewrite commits succeeded. Check the logs to determine why the individual "
Expand All @@ -317,17 +312,17 @@ private Result doExecuteWithPartialProgress(

List<FileGroupRewriteResult> rewriteResults =
commitResults.stream()
.map(RewritePositionDeleteGroup::asResult)
.map(RewritePositionDeletesGroup::asResult)
.collect(Collectors.toList());
return ImmutableRewritePositionDeleteFiles.Result.builder()
.rewriteResults(rewriteResults)
.build();
}

Stream<RewritePositionDeleteGroup> toGroupStream(
Stream<RewritePositionDeletesGroup> toGroupStream(
RewriteExecutionContext ctx,
Map<StructLike, List<List<PositionDeletesScanTask>>> groupsByPartition) {
Stream<RewritePositionDeleteGroup> rewriteFileGroupStream =
Stream<RewritePositionDeletesGroup> rewriteFileGroupStream =
groupsByPartition.entrySet().stream()
.flatMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future: Can we try refactoring this using some helper methods cause Spotless formats this in a weird way. Not in this PR.

e -> {
Expand All @@ -338,18 +333,17 @@ Stream<RewritePositionDeleteGroup> toGroupStream(
tasks -> {
int globalIndex = ctx.currentGlobalIndex();
int partitionIndex = ctx.currentPartitionIndex(partition);
// as coerced partition used for map, return original partition
FileGroupInfo info =
ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
.globalIndex(globalIndex)
.partitionIndex(partitionIndex)
.partition(partition)
.build();
return new RewritePositionDeleteGroup(info, tasks);
return new RewritePositionDeletesGroup(info, tasks);
});
});

return rewriteFileGroupStream.sorted(RewritePositionDeleteGroup.comparator(rewriteJobOrder));
return rewriteFileGroupStream.sorted(RewritePositionDeletesGroup.comparator(rewriteJobOrder));
}

private void validateAndInitOptions() {
Expand Down Expand Up @@ -399,7 +393,7 @@ private void validateAndInitOptions() {
PARTIAL_PROGRESS_ENABLED);
}

private String jobDesc(RewritePositionDeleteGroup group, RewriteExecutionContext ctx) {
private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) {
StructLike partition = group.info().partition();
if (partition.size() > 0) {
return String.format(
Expand Down Expand Up @@ -455,4 +449,8 @@ public int totalGroupCount() {
return totalGroupCount;
}
}

private StructLike coercePartition(PositionDeletesScanTask task, StructType partitionType) {
return PartitionUtil.coercePartition(partitionType, task.spec(), task.partition());
}
}
Loading