Skip to content

Commit

Permalink
Implement rewrite position deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Apr 20, 2023
1 parent 37ec2df commit 4cf9cae
Show file tree
Hide file tree
Showing 19 changed files with 1,984 additions and 23 deletions.
17 changes: 17 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,23 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.view.ViewVersion::operation()"
justification: "Add operation API to view version"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.actions.RewritePositionDeleteFiles.PositionDeleteGroupRewriteResult>\
\ org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewriteResults()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::addedBytesCount()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewrittenBytesCount()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.RewriteFiles org.apache.iceberg.RewriteFiles::rewriteDeleteFiles(java.util.Set<org.apache.iceberg.DeleteFile>,\
\ java.util.Set<org.apache.iceberg.DeleteFile>)"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.actions.RewritePositionDeleteFiles org.apache.iceberg.actions.RewritePositionDeleteFiles::filterPositionDeletes(org.apache.iceberg.expressions.Expression)"
justification: "Adding new methods to internal interfaces"
org.apache.iceberg:iceberg-core:
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BaseExpireSnapshotsActionResult"
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/RewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ default RewriteFiles rewriteFiles(Set<DataFile> filesToDelete, Set<DataFile> fil
RewriteFiles rewriteFiles(
Set<DataFile> filesToDelete, Set<DataFile> filesToAdd, long sequenceNumber);

/**
* Add a rewrite that replaces one set of delete files with another set that contains the same
* data.
*
* @param filesToDelete files that will be replaced (deleted), cannot be null or empty.
* @param filesToAdd files that will be added, cannot be null or empty.
* @return this for method chaining
*/
RewriteFiles rewriteDeleteFiles(Set<DeleteFile> filesToDelete, Set<DeleteFile> filesToAdd);

/**
* Add a rewrite that replaces one set of files with another set that contains the same data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,10 @@ default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteReachableFiles");
}

/** Instantiates an action to rewrite position delete files all the files */
default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.iceberg.actions;

import java.util.List;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.expressions.Expression;
import org.immutables.value.Value;

/**
* An action for rewriting position delete files.
Expand All @@ -28,6 +32,80 @@
public interface RewritePositionDeleteFiles
extends SnapshotUpdate<RewritePositionDeleteFiles, RewritePositionDeleteFiles.Result> {

/**
* Enable committing groups of files (see max-file-group-size-bytes) prior to the entire rewrite
* completing. This will produce additional commits but allow for progress even if some groups
* fail to commit. This setting will not change the correctness of the rewrite operation as file
* groups can be compacted independently.
*
* <p>The default is false, which produces a single commit when the entire job has completed.
*/
String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";

boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;

/**
* The maximum amount of Iceberg commits that this rewrite is allowed to produce if partial
* progress is enabled. This setting has no effect if partial progress is disabled.
*/
String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";

int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;

/**
* The entire rewrite operation is broken down into pieces based on partitioning and within
* partitions based on size into groups. These sub-units of the rewrite are referred to as file
* groups. The largest amount of data that should be compacted in a single group is controlled by
* {@link #MAX_FILE_GROUP_SIZE_BYTES}. This helps with breaking down the rewriting of very large
* partitions which may not be rewritable otherwise due to the resource constraints of the
* cluster. For example a sort based rewrite may not scale to terabyte sized partitions, those
* partitions need to be worked on in small subsections to avoid exhaustion of resources.
*
* <p>When grouping files, the underlying rewrite strategy will use this value as to limit the
* files which will be included in a single file group. A group will be processed by a single
* framework "action". For example, in Spark this means that each group would be rewritten in its
* own Spark action. A group will never contain files for multiple output partitions.
*/
String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";

long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes

/**
* The max number of file groups to be simultaneously rewritten by the rewrite strategy. The
* structure and contents of the group is determined by the rewrite strategy. Each file group will
* be rewritten independently and asynchronously.
*/
String MAX_CONCURRENT_FILE_GROUP_REWRITES = "max-concurrent-file-group-rewrites";

int MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT = 5;

/**
* The output file size that this rewrite strategy will attempt to generate when rewriting files.
* By default this will use the "write.target-file-size-bytes value" in the table properties of
* the table being updated.
*/
String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";

/**
* Forces the rewrite job order based on the value.
*
* <p>
*
* <ul>
* <li>If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
* <li>If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
* <li>If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
* <li>If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
* <li>If rewrite-job-order=none, then rewrite job groups in the order they were planned (no
* specific ordering).
* </ul>
*
* <p>Defaults to none.
*/
String REWRITE_JOB_ORDER = "rewrite-job-order";

String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
* A filter for finding deletes to rewrite.
*
Expand All @@ -41,11 +119,60 @@ public interface RewritePositionDeleteFiles
RewritePositionDeleteFiles filter(Expression expression);

/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {
List<PositionDeleteGroupRewriteResult> rewriteResults();

/** Returns the count of the position deletes that been rewritten. */
int rewrittenDeleteFilesCount();

/** Returns the count of the added delete files. */
int addedDeleteFilesCount();

/** Returns the number of bytes of position deletes that have been rewritten */
long rewrittenBytesCount();

/** Returns the number of bytes of newly added position deletes */
long addedBytesCount();
}

/**
* For a particular position delete file group, the number of position delete files which are
* newly created and the number of files which were formerly part of the table but have been
* rewritten.
*/
@Value.Immutable
interface PositionDeleteGroupRewriteResult {
PositionDeleteGroupInfo info();

int addedDeleteFilesCount();

int rewrittenDeleteFilesCount();

long rewrittenBytesCount();

long addedBytesCount();
}

/**
* A description of a position delete file group, when it was processed, and within which
* partition. For use tracking rewrite operations and for returning results.
*/
@Value.Immutable
interface PositionDeleteGroupInfo {
/**
* returns which position delete file group this is out of the total set of file groups for this
* rewrite
*/
int globalIndex();

/**
* returns which position delete file group this is out of the set of file groups for this
* partition
*/
int partitionIndex();

/** returns which partition this position delete file group contains files from */
StructLike partition();
}
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public RewriteFiles rewriteFiles(
return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of());
}

@Override
public RewriteFiles rewriteDeleteFiles(
Set<DeleteFile> filesToDelete, Set<DeleteFile> filesToAdd) {
return rewriteFiles(ImmutableSet.of(), filesToDelete, ImmutableSet.of(), filesToAdd);
}

@Override
public RewriteFiles rewriteFiles(
Set<DataFile> dataFilesToReplace,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/**
* Container class representing a set of position delete files to be rewritten by a RewriteAction
* and the new files which have been written by the action.
*/
public class RewritePositionDeleteGroup {
private final RewritePositionDeleteFiles.PositionDeleteGroupInfo info;
private final List<PositionDeletesScanTask> positionDeletesScanTasks;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();

public RewritePositionDeleteGroup(
RewritePositionDeleteFiles.PositionDeleteGroupInfo info,
List<PositionDeletesScanTask> fileScanTasks) {
this.info = info;
this.positionDeletesScanTasks = fileScanTasks;
}

public RewritePositionDeleteFiles.PositionDeleteGroupInfo info() {
return info;
}

public List<PositionDeletesScanTask> scans() {
return positionDeletesScanTasks;
}

public void setOutputFiles(Set<DeleteFile> files) {
addedDeleteFiles = files;
}

public Set<DeleteFile> rewrittenDeleteFiles() {
return scans().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet());
}

public Set<DeleteFile> addedDeleteFiles() {
return addedDeleteFiles;
}

public RewritePositionDeleteFiles.PositionDeleteGroupRewriteResult asResult() {
Preconditions.checkState(
addedDeleteFiles != null, "Cannot get result, Group was never rewritten");
long addedBytes = addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();

return ImmutablePositionDeleteGroupRewriteResult.builder()
.info(info)
.addedDeleteFilesCount(addedDeleteFiles.size())
.rewrittenDeleteFilesCount(positionDeletesScanTasks.size())
.rewrittenBytesCount(rewrittenBytes())
.addedBytesCount(addedBytes())
.build();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("info", info)
.add("numRewrittenPositionDeleteFiles", positionDeletesScanTasks.size())
.add(
"numAddedPositionDeleteFiles",
addedDeleteFiles == null
? "Rewrite Incomplete"
: Integer.toString(addedDeleteFiles.size()))
.add("numRewrittenBytes", rewrittenBytes())
.toString();
}

public long rewrittenBytes() {
return positionDeletesScanTasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
}

public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
}

public int numDeleteFiles() {
return positionDeletesScanTasks.size();
}
}
Loading

0 comments on commit 4cf9cae

Please sign in to comment.