Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Rewrite V2 deletes to V3 DVs / Detect dangling DVs properly #12606

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RewriteFiles;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDeleteFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DeleteFileSet;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -59,6 +61,7 @@
class RemoveDanglingDeletesSparkAction
extends BaseSnapshotUpdateSparkAction<RemoveDanglingDeletesSparkAction>
implements RemoveDanglingDeleteFiles {

private static final Logger LOG = LoggerFactory.getLogger(RemoveDanglingDeletesSparkAction.class);
private final Table table;

Expand All @@ -79,21 +82,27 @@ public Result execute() {
.removedDeleteFiles(Collections.emptyList())
.build();
}

String desc = String.format("Removing dangling delete files in %s", table.name());
JobGroupInfo info = newJobGroupInfo("REMOVE-DELETES", desc);
return withJobGroupInfo(info, this::doExecute);
}

Result doExecute() {
RewriteFiles rewriteFiles = table.newRewrite();
List<DeleteFile> danglingDeletes = findDanglingDeletes();
DeleteFileSet danglingDeletes = DeleteFileSet.create();
danglingDeletes.addAll(findDanglingDeletes());
danglingDeletes.addAll(findDanglingDvs());

for (DeleteFile deleteFile : danglingDeletes) {
LOG.debug("Removing dangling delete file {}", deleteFile.location());
rewriteFiles.deleteFile(deleteFile);
}

if (!danglingDeletes.isEmpty()) {
commit(rewriteFiles);
}

return ImmutableRemoveDanglingDeleteFiles.Result.builder()
.removedDeleteFiles(danglingDeletes)
.build();
Expand Down Expand Up @@ -124,10 +133,12 @@ private List<DeleteFile> findDanglingDeletes() {
.groupBy("partition", "spec_id")
.agg(min("sequence_number"))
.toDF("grouped_partition", "grouped_spec_id", "min_data_sequence_number");

Dataset<Row> deleteEntries =
loadMetadataTable(table, MetadataTableType.ENTRIES)
// find live delete files
.filter("data_file.content != 0 AND status < 2");

Column joinOnPartition =
deleteEntries
.col("data_file.spec_id")
Expand All @@ -136,9 +147,10 @@ private List<DeleteFile> findDanglingDeletes() {
deleteEntries
.col("data_file.partition")
.equalTo(minSequenceNumberByPartition.col("grouped_partition")));

Column filterOnDanglingDeletes =
col("min_data_sequence_number")
// delete fies without any data files in partition
// delete files without any data files in partition
.isNull()
// position delete files without any applicable data files in partition
.or(
Expand All @@ -150,6 +162,7 @@ private List<DeleteFile> findDanglingDeletes() {
col("data_file.content")
.equalTo("2")
.and(col("sequence_number").$less$eq(col("min_data_sequence_number"))));

Dataset<Row> danglingDeletes =
deleteEntries
.join(minSequenceNumberByPartition, joinOnPartition, "left")
Expand All @@ -161,6 +174,27 @@ private List<DeleteFile> findDanglingDeletes() {
.collect(Collectors.toList());
}

private List<DeleteFile> findDanglingDvs() {
Dataset<Row> dvs =
loadMetadataTable(table, MetadataTableType.DELETE_FILES)
.where(col("file_format").equalTo(FileFormat.PUFFIN.name()));
Dataset<Row> dataFiles = loadMetadataTable(table, MetadataTableType.DATA_FILES);

// a DV not pointing to a valid data file path is implicitly a dangling delete
List<Row> danglingDvs =
dvs.join(
dataFiles,
dvs.col("referenced_data_file").equalTo(dataFiles.col("file_path")),
"leftouter")
.filter(dataFiles.col("file_path").isNull())
.select(dvs.col("*"))
.collectAsList();
return danglingDvs.stream()
// map on driver because SparkDeleteFile is not serializable
.map(row -> deleteFileWrapper(dvs.schema(), row))
.collect(Collectors.toList());
}

private DeleteFile deleteFileWrapper(StructType sparkFileType, Row row) {
int specId = row.getInt(row.fieldIndex("spec_id"));
Types.StructType combinedFileType = DataFile.getType(Partitioning.partitionType(table));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.PositionDeletesTable;
import org.apache.iceberg.PositionDeletesTable.PositionDeletesBatchScan;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.StructLike;
Expand All @@ -50,6 +52,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -123,6 +126,11 @@ public RewritePositionDeleteFiles.Result execute() {

validateAndInitOptions();

if (TableUtil.formatVersion(table) >= 3 && !requiresRewriteToDVs()) {
LOG.info("v2 deletes in {} have already been rewritten to v3 DVs", table.name());
return EMPTY_RESULT;
}

StructLikeMap<List<List<PositionDeletesScanTask>>> fileGroupsByPartition = planFileGroups();
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);

Expand All @@ -140,6 +148,29 @@ public RewritePositionDeleteFiles.Result execute() {
}
}

private boolean requiresRewriteToDVs() {
PositionDeletesBatchScan scan =
(PositionDeletesBatchScan)
MetadataTableUtils.createMetadataTableInstance(
table, MetadataTableType.POSITION_DELETES)
.newBatchScan();
try (CloseableIterator<PositionDeletesScanTask> it =
CloseableIterable.filter(
CloseableIterable.transform(
scan.baseTableFilter(filter)
.caseSensitive(caseSensitive)
.select(PositionDeletesTable.DELETE_FILE_PATH)
.ignoreResiduals()
.planFiles(),
task -> (PositionDeletesScanTask) task),
t -> t.file().format() != FileFormat.PUFFIN)
.iterator()) {
return it.hasNext();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
Expand All @@ -161,7 +192,6 @@ private StructLikeMap<List<List<PositionDeletesScanTask>>> planFileGroups() {

private CloseableIterable<PositionDeletesScanTask> planFiles(Table deletesTable) {
PositionDeletesBatchScan scan = (PositionDeletesBatchScan) deletesTable.newBatchScan();

return CloseableIterable.transform(
scan.baseTableFilter(filter).caseSensitive(caseSensitive).ignoreResiduals().planFiles(),
task -> (PositionDeletesScanTask) task);
Expand Down Expand Up @@ -404,9 +434,6 @@ private void validateAndInitOptions() {
PARTIAL_PROGRESS_MAX_COMMITS,
maxCommits,
PARTIAL_PROGRESS_ENABLED);

Preconditions.checkArgument(
TableUtil.formatVersion(table) <= 2, "Cannot rewrite position deletes for V3 table");
}

private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected Stream<ContentFile<?>> referencedFiles(PositionDeletesScanTask task) {
return Stream.of(task.file());
}

@SuppressWarnings("resource") // handled by BaseReader
@Override
protected CloseableIterator<InternalRow> open(PositionDeletesScanTask task) {
String filePath = task.file().location();
Expand Down
Loading