Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed May 3, 2023
1 parent f3d4cb2 commit 6ce8fae
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 157 deletions.
22 changes: 4 additions & 18 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,10 @@ acceptedBreaks:
- code: "java.method.addedToInterface"
new: "method java.lang.String org.apache.iceberg.view.ViewVersion::operation()"
justification: "Add operation API to view version"

- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult>\
\ org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewriteResults()"
justification: "New method added to un-implemented interface"
- code: "java.method.removed"
old: "method java.lang.Integer org.apache.iceberg.view.ImmutableSQLViewRepresentation::schemaId()"
justification: "Moving SchemaID to ViewVersion. View Spec implementation has\
Expand All @@ -435,23 +438,6 @@ acceptedBreaks:
\ org.apache.iceberg.view.ImmutableSQLViewRepresentation.Builder::schemaId(java.lang.Integer)"
justification: "Moving SchemaID to ViewVersion. View Spec implementation has\
\ not been voted on yet so this break is acceptable"
- code: "java.method.addedToInterface"
new: "method java.util.List<org.apache.iceberg.actions.RewritePositionDeleteFiles.PositionDeleteGroupRewriteResult>\
\ org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewriteResults()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::addedBytesCount()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.RewritePositionDeleteFiles.Result::rewrittenBytesCount()"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.RewriteFiles org.apache.iceberg.RewriteFiles::rewriteDeleteFiles(java.util.Set<org.apache.iceberg.DeleteFile>,\
\ java.util.Set<org.apache.iceberg.DeleteFile>)"
justification: "Adding new methods to internal interfaces"
- code: "java.method.addedToInterface"
new: "method org.apache.iceberg.actions.RewritePositionDeleteFiles org.apache.iceberg.actions.RewritePositionDeleteFiles::filterPositionDeletes(org.apache.iceberg.expressions.Expression)"
justification: "Adding new methods to internal interfaces"
org.apache.iceberg:iceberg-core:
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BaseExpireSnapshotsActionResult"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*
* <p>Generally used for optimizing the size and layout of position delete files within a table.
*/
@Value.Enclosing
public interface RewritePositionDeleteFiles
extends SnapshotUpdate<RewritePositionDeleteFiles, RewritePositionDeleteFiles.Result> {

Expand Down Expand Up @@ -64,8 +65,6 @@ public interface RewritePositionDeleteFiles
/**
* Forces the rewrite job order based on the value.
*
* <p>
*
* <ul>
* <li>If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
* <li>If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
Expand Down Expand Up @@ -96,34 +95,30 @@ public interface RewritePositionDeleteFiles
/** The action result that contains a summary of the execution. */
@Value.Immutable
interface Result {
List<PositionDeleteGroupRewriteResult> rewriteResults();
List<FileGroupRewriteResult> rewriteResults();

/** Returns the count of the position delete files that been rewritten. */
default int rewrittenDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(PositionDeleteGroupRewriteResult::rewrittenDeleteFilesCount)
.mapToInt(FileGroupRewriteResult::rewrittenDeleteFilesCount)
.sum();
}

/** Returns the count of the added position delete files. */
default int addedDeleteFilesCount() {
return rewriteResults().stream()
.mapToInt(PositionDeleteGroupRewriteResult::addedDeleteFilesCount)
.mapToInt(FileGroupRewriteResult::addedDeleteFilesCount)
.sum();
}

/** Returns the number of bytes of position delete files that have been rewritten */
default long rewrittenBytesCount() {
return rewriteResults().stream()
.mapToLong(PositionDeleteGroupRewriteResult::rewrittenBytesCount)
.sum();
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
}

/** Returns the number of bytes of newly added position delete files */
default long addedBytesCount() {
return rewriteResults().stream()
.mapToLong(PositionDeleteGroupRewriteResult::addedBytesCount)
.sum();
return rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum();
}
}

Expand All @@ -133,22 +128,20 @@ default long addedBytesCount() {
* rewritten.
*/
@Value.Immutable
interface PositionDeleteGroupRewriteResult {
/** Description of this position delete file group * */
PositionDeleteGroupInfo info();
interface FileGroupRewriteResult {
/** Description of this position delete file group. */
FileGroupInfo info();

/** Returns the count of the position delete files that been rewritten in this group. */
int rewrittenDeleteFilesCount();

/** Returns the count of the added position delete files in this group. */
int addedDeleteFilesCount();

/**
* Returns the number of bytes of position delete files that have been rewritten in this group
*/
/** Returns the number of bytes of rewritten position delete files in this group. */
long rewrittenBytesCount();

/** Returns the number of bytes of newly added position delete files in this group */
/** Returns the number of bytes of newly added position delete files in this group. */
long addedBytesCount();
}

Expand All @@ -157,20 +150,23 @@ interface PositionDeleteGroupRewriteResult {
* partition. For use tracking rewrite operations and for returning results.
*/
@Value.Immutable
interface PositionDeleteGroupInfo {
interface FileGroupInfo {
/**
* returns which position delete file group this is out of the total set of file groups for this
* Returns which position delete file group this is out of the total set of file groups for this
* rewrite
*/
int globalIndex();

/**
* returns which position delete file group this is out of the set of file groups for this
* Returns which position delete file group this is out of the set of file groups for this
* partition
*/
int partitionIndex();

/** returns which partition this position delete file group contains files from */
/**
* Returns which partition this position delete file group contains files from. This will be of
* the type of the table's unified partition type considering all specs in a table.
*/
StructLike partition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.PositionDeleteGroupInfo;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.PositionDeleteGroupRewriteResult;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo;
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

Expand All @@ -34,23 +36,23 @@
* RewritePositionDeleteFiles} and the new files which have been written by the action.
*/
public class RewritePositionDeleteGroup {
private final PositionDeleteGroupInfo info;
private final List<PositionDeletesScanTask> positionDeletesScanTasks;
private final FileGroupInfo info;
private final List<PositionDeletesScanTask> tasks;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();

public RewritePositionDeleteGroup(
PositionDeleteGroupInfo info, List<PositionDeletesScanTask> fileScanTasks) {
FileGroupInfo info, List<PositionDeletesScanTask> fileScanTasks) {
this.info = info;
this.positionDeletesScanTasks = fileScanTasks;
this.tasks = fileScanTasks;
}

public PositionDeleteGroupInfo info() {
public FileGroupInfo info() {
return info;
}

public List<PositionDeletesScanTask> tasks() {
return positionDeletesScanTasks;
return tasks;
}

public void setOutputFiles(Set<DeleteFile> files) {
Expand All @@ -65,14 +67,14 @@ public Set<DeleteFile> addedDeleteFiles() {
return addedDeleteFiles;
}

public PositionDeleteGroupRewriteResult asResult() {
public FileGroupRewriteResult asResult() {
Preconditions.checkState(
addedDeleteFiles != null, "Cannot get result, Group was never rewritten");

return ImmutablePositionDeleteGroupRewriteResult.builder()
return ImmutableRewritePositionDeleteFiles.FileGroupRewriteResult.builder()
.info(info)
.addedDeleteFilesCount(addedDeleteFiles.size())
.rewrittenDeleteFilesCount(positionDeletesScanTasks.size())
.rewrittenDeleteFilesCount(tasks.size())
.rewrittenBytesCount(rewrittenBytes())
.addedBytesCount(addedBytes())
.build();
Expand All @@ -82,7 +84,7 @@ public PositionDeleteGroupRewriteResult asResult() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("info", info)
.add("numRewrittenPositionDeleteFiles", positionDeletesScanTasks.size())
.add("numRewrittenPositionDeleteFiles", tasks.size())
.add(
"numAddedPositionDeleteFiles",
addedDeleteFiles == null
Expand All @@ -94,14 +96,31 @@ public String toString() {
}

public long rewrittenBytes() {
return positionDeletesScanTasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
return tasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
}

public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
}

public int numRewrittenDeleteFiles() {
return positionDeletesScanTasks.size();
return tasks.size();
}

public static Comparator<RewritePositionDeleteGroup> comparator(RewriteJobOrder order) {
switch (order) {
case BYTES_ASC:
return Comparator.comparing(RewritePositionDeleteGroup::rewrittenBytes);
case BYTES_DESC:
return Comparator.comparing(
RewritePositionDeleteGroup::rewrittenBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewritePositionDeleteGroup::numRewrittenDeleteFiles);
case FILES_DESC:
return Comparator.comparing(
RewritePositionDeleteGroup::numRewrittenDeleteFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -61,16 +60,16 @@ public void commitFileGroups(Set<RewritePositionDeleteGroup> fileGroups) {
addedDeleteFiles.addAll(group.addedDeleteFiles());
}

RewriteFiles rewrite = table.newRewrite().validateFromSnapshot(startingSnapshotId);
rewrite.rewriteFiles(
ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles);

rewrite.commit();
table
.newRewrite()
.validateFromSnapshot(startingSnapshotId)
.rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles)
.commit();
}

/**
* Clean up a specified file set by removing any files created for that operation, should not
* throw any exceptions
* throw any exceptions.
*
* @param fileGroup group of files which has already been rewritten
*/
Expand Down
Loading

0 comments on commit 6ce8fae

Please sign in to comment.