Skip to content

Commit

Permalink
Spark 3.4: Action for rewriting position deletes (#7389)
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho authored May 4, 2023
1 parent d8142d1 commit 667fd86
Show file tree
Hide file tree
Showing 17 changed files with 1,915 additions and 29 deletions.
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.view.ViewVersion::operation()"
justification: "Add operation API to view version"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult>\
\ org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewriteResults()"
justification: "New method added to un-implemented interface"
- code: "java.method.removed"
old: "method java.lang.Integer org.apache.iceberg.view.ImmutableSQLViewRepresentation::schemaId()"
justification: "Moving SchemaID to ViewVersion. View Spec implementation has\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteReachableFiles");
}

/** Instantiates an action to rewrite position delete files */
default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,68 @@
*/
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.immutables.value.Value;

/**
* An action for rewriting position delete files.
*
* <p>Generally used for optimizing the size and layout of position delete files within a table.
*/
@Value.Enclosing
public interface RewritePositionDeleteFiles
extends SnapshotUpdate<RewritePositionDeleteFiles, RewritePositionDeleteFiles.Result> {

/**
* Enable committing groups of files (see max-file-group-size-bytes) prior to the entire rewrite
* completing. This will produce additional commits but allow for progress even if some groups
* fail to commit. This setting will not change the correctness of the rewrite operation as file
* groups can be compacted independently.
*
* <p>The default is false, which produces a single commit when the entire job has completed.
*/
String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";

boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;

/**
* The maximum amount of Iceberg commits that this rewrite is allowed to produce if partial
* progress is enabled. This setting has no effect if partial progress is disabled.
*/
String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";

int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;

/**
* The max number of file groups to be simultaneously rewritten by the rewrite strategy. The
* structure and contents of the group is determined by the rewrite strategy. Each file group will
* be rewritten independently and asynchronously.
*/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";

int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 5;

/**
* Forces the rewrite job order based on the value.
*
* <ul>
* <li>If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
* <li>If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
* <li>If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
* <li>If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
* <li>If rewrite-job-order=none, then rewrite job groups in the order they were planned (no
* specific ordering).
* </ul>
*
* <p>Defaults to none.
*/
String REWRITE_JOB_ORDER = "rewrite-job-order";

String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
* A filter for finding deletes to rewrite.
*
Expand All @@ -41,11 +93,80 @@ public interface RewritePositionDeleteFiles
RewritePositionDeleteFiles filter(Expression expression);

/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {
/** Returns the count of the position deletes that been rewritten. */
List<FileGroupRewriteResult> rewriteResults();

/** Returns the count of the position delete files that have been rewritten. */
default int rewrittenDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum();
}

/** Returns the count of the added position delete files. */
default int addedDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum();
}

/** Returns the number of bytes of position delete files that have been rewritten */
default long rewrittenBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}

/** Returns the number of bytes of newly added position delete files */
default long addedBytesCount() {
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum();
}
}

/**
* For a particular position delete file group, the number of position delete files which are
* newly created and the number of files which were formerly part of the table but have been
* rewritten.
*/
@Value.Immutable
interface FileGroupRewriteResult {
/** Description of this position delete file group. */
FileGroupInfo info();

/** Returns the count of the position delete files that been rewritten in this group. */
int rewrittenDeleteFilesCount();

/** Returns the count of the added delete files. */
/** Returns the count of the added position delete files in this group. */
int addedDeleteFilesCount();

/** Returns the number of bytes of rewritten position delete files in this group. */
long rewrittenBytesCount();

/** Returns the number of bytes of newly added position delete files in this group. */
long addedBytesCount();
}

/**
* A description of a position delete file group, when it was processed, and within which
* partition. For use tracking rewrite operations and for returning results.
*/
@Value.Immutable
interface FileGroupInfo {
/**
* Returns which position delete file group this is out of the total set of file groups for this
* rewrite
*/
int globalIndex();

/**
* Returns which position delete file group this is out of the set of file groups for this
* partition
*/
int partitionIndex();

/**
* Returns which partition this position delete file group contains files from. This will be of
* the type of the table's unified partition type considering all specs in a table.
*/
StructLike partition();
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
* @param type type of files being deleted
* @param concurrent controls concurrent deletion. Only applicable for non-bulk FileIO
*/
private static void deleteFiles(
public static void deleteFiles(
FileIO io, Iterable<String> files, String type, boolean concurrent) {
if (io instanceof SupportsBulkOperations) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;

/** A strategy for an action to rewrite position delete files. */
/**
* A strategy for an action to rewrite position delete files.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; Use {@link SizeBasedFileRewriter} instead
*/
@Deprecated
public interface RewritePositionDeleteStrategy {

/** Returns the name of this rewrite deletes strategy */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Set;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Functionality used by {@link RewritePositionDeleteFiles} from different platforms to handle
* commits.
*/
public class RewritePositionDeletesCommitManager {
private static final Logger LOG =
LoggerFactory.getLogger(RewritePositionDeletesCommitManager.class);

private final Table table;
private final long startingSnapshotId;

public RewritePositionDeletesCommitManager(Table table) {
this.table = table;
this.startingSnapshotId = table.currentSnapshot().snapshotId();
}

/**
* Perform a commit operation on the table adding and removing files as required for this set of
* file groups.
*
* @param fileGroups file groups to commit
*/
public void commit(Set<RewritePositionDeletesGroup> fileGroups) {
Set<DeleteFile> rewrittenDeleteFiles = Sets.newHashSet();
Set<DeleteFile> addedDeleteFiles = Sets.newHashSet();
for (RewritePositionDeletesGroup group : fileGroups) {
rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles());
addedDeleteFiles.addAll(group.addedDeleteFiles());
}

table
.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles)
.commit();
}

/**
* Clean up a specified file set by removing any files created for that operation, should not
* throw any exceptions.
*
* @param fileGroup group of files which has already been rewritten
*/
public void abort(RewritePositionDeletesGroup fileGroup) {
Preconditions.checkState(
fileGroup.addedDeleteFiles() != null, "Cannot abort a fileGroup that was not rewritten");

Iterable<String> filePaths =
Iterables.transform(fileGroup.addedDeleteFiles(), f -> f.path().toString());
CatalogUtil.deleteFiles(table.io(), filePaths, "position delete", true);
}

public void commitOrClean(Set<RewritePositionDeletesGroup> rewriteGroups) {
try {
commit(rewriteGroups);
} catch (CommitStateUnknownException e) {
LOG.error(
"Commit state unknown for {}, cannot clean up files because they may have been committed successfully.",
rewriteGroups,
e);
throw e;
} catch (Exception e) {
LOG.error("Cannot commit groups {}, attempting to clean up written files", rewriteGroups, e);
rewriteGroups.forEach(this::abort);
throw e;
}
}

/**
* An async service which allows for committing multiple file groups as their rewrites complete.
* The service also allows for partial-progress since commits can fail. Once the service has been
* closed no new file groups should not be offered.
*
* @param rewritesPerCommit number of file groups to include in a commit
* @return the service for handling commits
*/
public CommitService service(int rewritesPerCommit) {
return new CommitService(rewritesPerCommit);
}

public class CommitService extends BaseCommitService<RewritePositionDeletesGroup> {

CommitService(int rewritesPerCommit) {
super(table, rewritesPerCommit);
}

@Override
protected void commitOrClean(Set<RewritePositionDeletesGroup> batch) {
RewritePositionDeletesCommitManager.this.commitOrClean(batch);
}

@Override
protected void abortFileGroup(RewritePositionDeletesGroup group) {
RewritePositionDeletesCommitManager.this.abort(group);
}
}
}
Loading

0 comments on commit 667fd86

Please sign in to comment.