diff --git a/api/src/main/java/org/apache/iceberg/ContentScanTask.java b/api/src/main/java/org/apache/iceberg/ContentScanTask.java index 00a6a1b6300a..8b461d0db747 100644 --- a/api/src/main/java/org/apache/iceberg/ContentScanTask.java +++ b/api/src/main/java/org/apache/iceberg/ContentScanTask.java @@ -38,6 +38,11 @@ default StructLike partition() { return file().partition(); } + @Override + default long sizeBytes() { + return length(); + } + /** * The starting position of this scan range in the file. * diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java index 4ca2b553d884..26b3c8a6f525 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java @@ -73,9 +73,12 @@ protected BaseMetadataTable(Table table, String name) { */ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { PartitionSpec.Builder identitySpecBuilder = - PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); + PartitionSpec.builderFor(metadataTableSchema) + .withSpecId(spec.specId()) + .checkConflicts(false); for (PartitionField field : spec.fields()) { - identitySpecBuilder.add(field.fieldId(), field.name(), Transforms.identity()); + identitySpecBuilder.add( + field.fieldId(), field.fieldId(), field.name(), Transforms.identity()); } return identitySpecBuilder.build(); } diff --git a/core/src/main/java/org/apache/iceberg/BasePositionDeletesScanTask.java b/core/src/main/java/org/apache/iceberg/BasePositionDeletesScanTask.java new file mode 100644 index 000000000000..b111ff92f2a8 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BasePositionDeletesScanTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.ResidualEvaluator; + +/** Base implememntation of {@link PositionDeletesScanTask} */ +class BasePositionDeletesScanTask extends BaseContentScanTask + implements PositionDeletesScanTask, SplittableScanTask { + + BasePositionDeletesScanTask( + DeleteFile file, String schemaString, String specString, ResidualEvaluator evaluator) { + super(file, schemaString, specString, evaluator); + } + + @Override + protected BasePositionDeletesScanTask self() { + return this; + } + + @Override + protected PositionDeletesScanTask newSplitTask( + PositionDeletesScanTask parentTask, long offset, long length) { + return new SplitPositionDeletesScanTask(parentTask, offset, length); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index e8c76072364b..304dae2514b3 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import org.apache.iceberg.expressions.Binder; @@ -59,6 +60,23 @@ abstract class BaseScan> private static final List SCAN_WITH_STATS_COLUMNS = ImmutableList.builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build(); + protected static final List DELETE_SCAN_COLUMNS = + ImmutableList.of( + "snapshot_id", + "content", + "file_path", + "file_ordinal", + "file_format", + "block_size_in_bytes", + "file_size_in_bytes", + "record_count", + "partition", + "key_metadata", + "split_offsets"); + + protected static final List DELETE_SCAN_WITH_STATS_COLUMNS = + ImmutableList.builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build(); + private static final boolean PLAN_SCANS_WITH_WORKER_POOL = SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true); @@ -84,7 +102,7 @@ protected TableOperations tableOps() { return null; } - protected Table table() { + public Table table() { return table; } @@ -96,6 +114,10 @@ protected TableScanContext context() { return context; } + protected Map options() { + return context().options(); + } + protected List scanColumns() { return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS; } diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java index f829dc019287..317e50e22e5c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java @@ -18,63 +18,17 @@ */ package org.apache.iceberg; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iceberg.events.Listeners; -import org.apache.iceberg.events.ScanEvent; -import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.metrics.DefaultMetricsContext; -import org.apache.iceberg.metrics.ImmutableScanReport; -import org.apache.iceberg.metrics.ScanMetrics; -import org.apache.iceberg.metrics.ScanMetricsResult; -import org.apache.iceberg.metrics.ScanReport; -import org.apache.iceberg.metrics.Timer; -import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.DateTimeUtil; -import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.TableScanUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Base class for {@link TableScan} implementations. */ -abstract class BaseTableScan extends BaseScan +abstract class BaseTableScan extends SnapshotScan implements TableScan { - private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class); - private ScanMetrics scanMetrics; protected BaseTableScan(Table table, Schema schema, TableScanContext context) { super(table, schema, context); } - protected Long snapshotId() { - return context().snapshotId(); - } - - protected Map options() { - return context().options(); - } - - protected abstract CloseableIterable doPlanFiles(); - - protected ScanMetrics scanMetrics() { - if (scanMetrics == null) { - this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext()); - } - - return scanMetrics; - } - - @Override - public Table table() { - return super.table(); - } - @Override public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) { throw new UnsupportedOperationException("Incremental scan is not supported"); @@ -85,77 +39,6 @@ public TableScan appendsAfter(long fromSnapshotId) { throw new UnsupportedOperationException("Incremental scan is not supported"); } - @Override - public TableScan useSnapshot(long scanSnapshotId) { - Preconditions.checkArgument( - snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId()); - Preconditions.checkArgument( - table().snapshot(scanSnapshotId) != null, - "Cannot find snapshot with ID %s", - scanSnapshotId); - return newRefinedScan(table(), tableSchema(), context().useSnapshotId(scanSnapshotId)); - } - - @Override - public TableScan useRef(String name) { - Preconditions.checkArgument( - snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId()); - Snapshot snapshot = table().snapshot(name); - Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name); - return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId())); - } - - @Override - public TableScan asOfTime(long timestampMillis) { - Preconditions.checkArgument( - snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId()); - - return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); - } - - @Override - public CloseableIterable planFiles() { - Snapshot snapshot = snapshot(); - if (snapshot != null) { - LOG.info( - "Scanning table {} snapshot {} created at {} with filter {}", - table(), - snapshot.snapshotId(), - DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()), - ExpressionUtil.toSanitizedString(filter())); - - Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema())); - List projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema())); - List projectedFieldNames = - projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList()); - - Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start(); - - return CloseableIterable.whenComplete( - doPlanFiles(), - () -> { - planningDuration.stop(); - Map metadata = Maps.newHashMap(context().options()); - metadata.putAll(EnvironmentContext.get()); - ScanReport scanReport = - ImmutableScanReport.builder() - .schemaId(schema().schemaId()) - .projectedFieldIds(projectedFieldIds) - .projectedFieldNames(projectedFieldNames) - .tableName(table().name()) - .snapshotId(snapshot.snapshotId()) - .filter(ExpressionUtil.sanitize(filter())) - .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics())) - .metadata(metadata) - .build(); - context().metricsReporter().report(scanReport); - }); - } else { - LOG.info("Scanning empty table {}", table()); - return CloseableIterable.empty(); - } - } - @Override public CloseableIterable planTasks() { CloseableIterable fileScanTasks = planFiles(); @@ -164,20 +47,4 @@ public CloseableIterable planTasks() { return TableScanUtil.planTasks( splitFiles, targetSplitSize(), splitLookback(), splitOpenFileCost()); } - - @Override - public Snapshot snapshot() { - return snapshotId() != null ? table().snapshot(snapshotId()) : table().currentSnapshot(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("table", table()) - .add("projection", schema().asStruct()) - .add("filter", filter()) - .add("ignoreResiduals", shouldIgnoreResiduals()) - .add("caseSensitive", isCaseSensitive()) - .toString(); - } } diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index 39777c2936ed..537f6dd9a42d 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -30,12 +30,11 @@ public class MetadataColumns { private MetadataColumns() {} // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns + public static final int FILE_PATH_COLUMN_ID = Integer.MAX_VALUE - 1; + public static final String FILE_PATH_COLUMN_DOC = "Path of the file in which a row is stored"; public static final NestedField FILE_PATH = NestedField.required( - Integer.MAX_VALUE - 1, - "_file", - Types.StringType.get(), - "Path of the file in which a row is stored"); + FILE_PATH_COLUMN_ID, "_file", Types.StringType.get(), FILE_PATH_COLUMN_DOC); public static final NestedField ROW_POSITION = NestedField.required( Integer.MAX_VALUE - 2, @@ -48,12 +47,11 @@ private MetadataColumns() {} "_deleted", Types.BooleanType.get(), "Whether the row has been deleted"); + public static final int SPEC_ID_COLUMN_ID = Integer.MAX_VALUE - 4; + public static final String SPEC_ID_COLUMN_DOC = "Spec ID used to track the file containing a row"; public static final NestedField SPEC_ID = NestedField.required( - Integer.MAX_VALUE - 4, - "_spec_id", - Types.IntegerType.get(), - "Spec ID used to track the file containing a row"); + SPEC_ID_COLUMN_ID, "_spec_id", Types.IntegerType.get(), SPEC_ID_COLUMN_DOC); // the partition column type is not static and depends on all specs in the table public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; public static final String PARTITION_COLUMN_NAME = "_partition"; diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java index df6263848b20..733a11d900f1 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java @@ -35,7 +35,8 @@ public enum MetadataTableType { ALL_DELETE_FILES, ALL_FILES, ALL_MANIFESTS, - ALL_ENTRIES; + ALL_ENTRIES, + POSITION_DELETES; public static MetadataTableType from(String name) { try { diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java index 36e2479a2f6f..d045bdbe83c1 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java +++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java @@ -77,6 +77,8 @@ private static Table createMetadataTableInstance( return new AllManifestsTable(baseTable, metadataTableName); case ALL_ENTRIES: return new AllEntriesTable(baseTable, metadataTableName); + case POSITION_DELETES: + return new PositionDeletesTable(baseTable, metadataTableName); default: throw new NoSuchTableException( "Unknown metadata table type: %s for %s", type, metadataTableName); diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesScanTask.java b/core/src/main/java/org/apache/iceberg/PositionDeletesScanTask.java new file mode 100644 index 000000000000..e8ae8a08cd68 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesScanTask.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +/** A {@link ScanTask} for position delete files */ +public interface PositionDeletesScanTask extends ContentScanTask {} diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java new file mode 100644 index 000000000000..3a3babaf4f39 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.ManifestEvaluator; +import org.apache.iceberg.expressions.ResidualEvaluator; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ParallelIterable; +import org.apache.iceberg.util.TableScanUtil; + +/** + * A {@link Table} implementation whose {@link Scan} provides {@link PositionDeletesScanTask}, for + * reading of position delete files. + */ +public class PositionDeletesTable extends BaseMetadataTable { + + private final Schema schema; + + PositionDeletesTable(Table table) { + super(table, table.name() + ".position_deletes"); + this.schema = calculateSchema(); + } + + PositionDeletesTable(Table table, String name) { + super(table, name); + this.schema = calculateSchema(); + } + + @Override + MetadataTableType metadataTableType() { + return MetadataTableType.POSITION_DELETES; + } + + @Override + public TableScan newScan() { + throw new UnsupportedOperationException( + "Cannot create TableScan from table of type POSITION_DELETES"); + } + + @Override + public BatchScan newBatchScan() { + return new PositionDeletesBatchScan(table(), schema()); + } + + @Override + public Schema schema() { + return schema; + } + + private Schema calculateSchema() { + Types.StructType partitionType = Partitioning.partitionType(table()); + Schema result = + new Schema( + MetadataColumns.DELETE_FILE_PATH, + MetadataColumns.DELETE_FILE_POS, + Types.NestedField.optional( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, + "row", + table().schema().asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC), + Types.NestedField.required( + MetadataColumns.PARTITION_COLUMN_ID, + "partition", + partitionType, + "Partition that position delete row belongs to"), + Types.NestedField.required( + MetadataColumns.SPEC_ID_COLUMN_ID, + "spec_id", + Types.IntegerType.get(), + MetadataColumns.SPEC_ID_COLUMN_DOC), + Types.NestedField.required( + MetadataColumns.FILE_PATH_COLUMN_ID, + "delete_file_path", + Types.StringType.get(), + MetadataColumns.FILE_PATH_COLUMN_DOC)); + + if (partitionType.fields().size() > 0) { + return result; + } else { + // avoid returning an empty struct, which is not always supported. + // instead, drop the partition field + return TypeUtil.selectNot(result, Sets.newHashSet(MetadataColumns.PARTITION_COLUMN_ID)); + } + } + + public static class PositionDeletesBatchScan + extends SnapshotScan> implements BatchScan { + + protected PositionDeletesBatchScan(Table table, Schema schema) { + super(table, schema, new TableScanContext()); + } + + protected PositionDeletesBatchScan(Table table, Schema schema, TableScanContext context) { + super(table, schema, context); + } + + @Override + protected PositionDeletesBatchScan newRefinedScan( + Table newTable, Schema newSchema, TableScanContext newContext) { + return new PositionDeletesBatchScan(newTable, newSchema, newContext); + } + + @Override + public CloseableIterable> planTasks() { + return TableScanUtil.planTaskGroups( + planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost()); + } + + @Override + protected List scanColumns() { + return context().returnColumnStats() ? DELETE_SCAN_WITH_STATS_COLUMNS : DELETE_SCAN_COLUMNS; + } + + @Override + protected CloseableIterable doPlanFiles() { + String schemaString = SchemaParser.toJson(tableSchema()); + + // prepare transformed partition specs and caches + Map transformedSpecs = + table().specs().values().stream() + .map(spec -> transformSpec(tableSchema(), spec)) + .collect(Collectors.toMap(PartitionSpec::specId, spec -> spec)); + + LoadingCache residualCache = + partitionCacheOf( + transformedSpecs, + spec -> + ResidualEvaluator.of( + spec, + shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(), + isCaseSensitive())); + + LoadingCache specStringCache = + partitionCacheOf(transformedSpecs, PartitionSpecParser::toJson); + + LoadingCache evalCache = + partitionCacheOf( + transformedSpecs, + spec -> ManifestEvaluator.forRowFilter(filter(), spec, isCaseSensitive())); + + // iterate through delete manifests + CloseableIterable deleteManifests = + CloseableIterable.withNoopClose(snapshot().deleteManifests(table().io())); + CloseableIterable filteredManifests = + CloseableIterable.filter( + deleteManifests, + manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + Iterable> results = + CloseableIterable.transform( + filteredManifests, + manifest -> { + // Filter partitions + CloseableIterable> deleteFileEntries = + ManifestFiles.readDeleteManifest(manifest, tableOps().io(), transformedSpecs) + .caseSensitive(isCaseSensitive()) + .select(scanColumns()) + .filterRows(filter()) + .liveEntries(); + + // Filter delete file type + CloseableIterable> positionDeleteEntries = + CloseableIterable.filter( + deleteFileEntries, + entry -> entry.file().content().equals(FileContent.POSITION_DELETES)); + + return CloseableIterable.transform( + positionDeleteEntries, + entry -> { + int specId = entry.file().specId(); + return new BasePositionDeletesScanTask( + entry.file().copy(context().returnColumnStats()), + schemaString, + specStringCache.get(specId), + residualCache.get(specId)); + }); + }); + + if (planExecutor() != null) { + return new ParallelIterable<>(results, planExecutor()); + } else { + return CloseableIterable.concat(results); + } + } + + private LoadingCache partitionCacheOf( + Map specs, Function constructor) { + return Caffeine.newBuilder() + .build( + specId -> { + PartitionSpec spec = specs.get(specId); + return constructor.apply(spec); + }); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 4f5ddef0c61d..cf8e1b3fbaa7 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -257,6 +257,11 @@ public TableScan newScan() { return lazyTable().newScan(); } + @Override + public BatchScan newBatchScan() { + return lazyTable().newBatchScan(); + } + @Override public Snapshot currentSnapshot() { return lazyTable().currentSnapshot(); @@ -375,6 +380,10 @@ protected SerializableMetadataTable(BaseMetadataTable metadataTable) { protected Table newTable(TableOperations ops, String tableName) { return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type); } + + public MetadataTableType type() { + return type; + } } // captures the current state of a Hadoop configuration in a serializable manner diff --git a/core/src/main/java/org/apache/iceberg/SnapshotScan.java b/core/src/main/java/org/apache/iceberg/SnapshotScan.java new file mode 100644 index 000000000000..d4c2352cd8f6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SnapshotScan.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.events.Listeners; +import org.apache.iceberg.events.ScanEvent; +import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.DefaultMetricsContext; +import org.apache.iceberg.metrics.ImmutableScanReport; +import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.Timer; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a common base class to share code between different BaseScan implementations that handle + * scans of a particular snapshot. + * + * @param actual BaseScan implementation class type + * @param type of ScanTask returned + * @param type of ScanTaskGroup returned + */ +public abstract class SnapshotScan> + extends BaseScan { + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotScan.class); + + private ScanMetrics scanMetrics; + + protected SnapshotScan(Table table, Schema schema, TableScanContext context) { + super(table, schema, context); + } + + protected Long snapshotId() { + return context().snapshotId(); + } + + protected abstract CloseableIterable doPlanFiles(); + + protected ScanMetrics scanMetrics() { + if (scanMetrics == null) { + this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext()); + } + + return scanMetrics; + } + + @Override + public Table table() { + return super.table(); + } + + public ThisT useSnapshot(long scanSnapshotId) { + Preconditions.checkArgument( + snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId()); + Preconditions.checkArgument( + table().snapshot(scanSnapshotId) != null, + "Cannot find snapshot with ID %s", + scanSnapshotId); + TableScanContext newContext = context().useSnapshotId(scanSnapshotId); + return newRefinedScan(table(), tableSchema(), context().useSnapshotId(scanSnapshotId)); + } + + public ThisT useRef(String name) { + Preconditions.checkArgument( + snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId()); + Snapshot snapshot = table().snapshot(name); + Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name); + return newRefinedScan(table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId())); + } + + public ThisT asOfTime(long timestampMillis) { + Preconditions.checkArgument( + snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId()); + + return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis)); + } + + @Override + public CloseableIterable planFiles() { + Snapshot snapshot = snapshot(); + + if (snapshot == null) { + LOG.info("Scanning empty table {}", table()); + return CloseableIterable.empty(); + } + + LOG.info( + "Scanning table {} snapshot {} created at {} with filter {}", + table(), + snapshot.snapshotId(), + DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()), + ExpressionUtil.toSanitizedString(filter())); + + Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema())); + List projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema())); + List projectedFieldNames = + projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList()); + + Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start(); + + return CloseableIterable.whenComplete( + doPlanFiles(), + () -> { + planningDuration.stop(); + Map metadata = Maps.newHashMap(context().options()); + metadata.putAll(EnvironmentContext.get()); + ScanReport scanReport = + ImmutableScanReport.builder() + .schemaId(schema().schemaId()) + .projectedFieldIds(projectedFieldIds) + .projectedFieldNames(projectedFieldNames) + .tableName(table().name()) + .snapshotId(snapshot.snapshotId()) + .filter(ExpressionUtil.sanitize(filter())) + .scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics())) + .metadata(metadata) + .build(); + context().metricsReporter().report(scanReport); + }); + } + + public Snapshot snapshot() { + return snapshotId() != null ? table().snapshot(snapshotId()) : table().currentSnapshot(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("table", table()) + .add("projection", schema().asStruct()) + .add("filter", filter()) + .add("ignoreResiduals", shouldIgnoreResiduals()) + .add("caseSensitive", isCaseSensitive()) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/SplitPositionDeletesScanTask.java b/core/src/main/java/org/apache/iceberg/SplitPositionDeletesScanTask.java new file mode 100644 index 000000000000..68c7d5f9fd88 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/SplitPositionDeletesScanTask.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** A split of a {@link PositionDeletesScanTask} that is mergeable. */ +class SplitPositionDeletesScanTask + implements PositionDeletesScanTask, MergeableScanTask { + + private final PositionDeletesScanTask parentTask; + private final long offset; + private final long length; + + protected SplitPositionDeletesScanTask( + PositionDeletesScanTask parentTask, long offset, long length) { + this.parentTask = parentTask; + this.offset = offset; + this.length = length; + } + + @Override + public DeleteFile file() { + return parentTask.file(); + } + + @Override + public PartitionSpec spec() { + return parentTask.spec(); + } + + @Override + public long start() { + return offset; + } + + @Override + public long length() { + return length; + } + + @Override + public Expression residual() { + return parentTask.residual(); + } + + @Override + public boolean canMerge(org.apache.iceberg.ScanTask other) { + if (other instanceof SplitPositionDeletesScanTask) { + SplitPositionDeletesScanTask that = (SplitPositionDeletesScanTask) other; + return file().equals(that.file()) && offset + length == that.start(); + } else { + return false; + } + } + + @Override + public SplitPositionDeletesScanTask merge(org.apache.iceberg.ScanTask other) { + SplitPositionDeletesScanTask that = (SplitPositionDeletesScanTask) other; + return new SplitPositionDeletesScanTask(parentTask, offset, length + that.length()); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("file", file().path()) + .add("partition_data", file().partition()) + .add("offset", offset) + .add("length", length) + .add("residual", residual()) + .toString(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java index af2f79c3c6f9..b3a5e85d3383 100644 --- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java @@ -91,7 +91,7 @@ private PartitionUtil() {} } // adapts the provided partition data to match the table partition type - private static StructLike coercePartition( + public static StructLike coercePartition( Types.StructType partitionType, PartitionSpec spec, StructLike partition) { StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType); diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java index 623d08f36b8e..ea7b2cfc95c2 100644 --- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.io.IOException; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -26,6 +27,8 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; import org.junit.Assert; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -96,4 +99,9 @@ protected void validateIncludesPartitionScan( return partition.get(position, Object.class).equals(partValue); })); } + + protected Map constantsMap( + PositionDeletesScanTask task, Types.StructType partitionType) { + return PartitionUtil.constantsMap(task, partitionType, (type, constant) -> constant); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 90d23e2fa54d..6f3dee2cec86 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -20,9 +20,12 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; +import java.util.Comparator; +import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -30,12 +33,16 @@ import java.util.stream.StreamSupport; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeWrapper; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -1040,4 +1047,172 @@ public void testAllManifestsTableSnapshotNot() { expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L), actualManifestListPaths(manifestsTableScan)); } + + @Test + public void testPositionDeletesWithFilter() { + Assume.assumeTrue("Position deletes supported only for v2 tables", formatVersion == 2); + preparePartitionedTable(); + + PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); + + Expression expression = + Expressions.and( + Expressions.equal("partition.data_bucket", 1), Expressions.greaterThan("pos", 0)); + BatchScan scan = positionDeletesTable.newBatchScan().filter(expression); + + assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class); + + List tasks = Lists.newArrayList(scan.planFiles()); + assertThat(tasks).hasSize(1); + + ScanTask task = tasks.get(0); + assertThat(task).isInstanceOf(PositionDeletesScanTask.class); + + Types.StructType partitionType = Partitioning.partitionType(table); + PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task; + + Assert.assertEquals( + "Expected correct partition on task", + 1, + (int) posDeleteTask.file().partition().get(0, Integer.class)); + Assert.assertEquals( + "Expected correct partition on constant column", + 1, + (int) + ((StructLike) + constantsMap(posDeleteTask, partitionType) + .get(MetadataColumns.PARTITION_COLUMN_ID)) + .get(0, Integer.class)); + + Assert.assertEquals( + "Expected correct partition spec id on task", 0, posDeleteTask.file().specId()); + Assert.assertEquals( + "Expected correct partition spec id on constant column", + 0, + (constantsMap(posDeleteTask, partitionType).get(MetadataColumns.SPEC_ID.fieldId()))); + + Assert.assertEquals( + "Expected correct delete file on task", FILE_B_DELETES.path(), posDeleteTask.file().path()); + Assert.assertEquals( + "Expected correct delete file on constant column", + FILE_B_DELETES.path(), + (constantsMap(posDeleteTask, partitionType).get(MetadataColumns.FILE_PATH.fieldId()))); + } + + @Test + public void testPositionDeletesResiduals() { + Assume.assumeTrue("Position deletes supported only for v2 tables", formatVersion == 2); + preparePartitionedTable(); + + PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); + + Expression expression = + Expressions.and( + Expressions.equal("partition.data_bucket", 1), Expressions.greaterThan("pos", 1)); + BatchScan scan = positionDeletesTable.newBatchScan().filter(expression); + + assertThat(scan).isExactlyInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class); + + List tasks = Lists.newArrayList(scan.planFiles()); + assertThat(tasks).hasSize(1); + + ScanTask task = tasks.get(0); + assertThat(task).isInstanceOf(PositionDeletesScanTask.class); + + PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task; + + Expression residual = posDeleteTask.residual(); + UnboundPredicate residualPred = + TestHelpers.assertAndUnwrap(residual, UnboundPredicate.class); + Assert.assertEquals( + "Expected partition residual to be evaluated", Expression.Operation.GT, residualPred.op()); + Assert.assertEquals( + "Expected partition residual to be evaluated", Literal.of(1), residualPred.literal()); + } + + @Test + public void testPositionDeletesUnpartitioned() { + Assume.assumeTrue("Position deletes supported only for v2 tables", formatVersion == 2); + table.updateSpec().removeField(Expressions.bucket("data", BUCKETS_NUMBER)).commit(); + + Assert.assertEquals("Table should now be unpartitioned", 0, table.spec().fields().size()); + + DataFile dataFile1 = + DataFiles.builder(table.spec()) + .withPath("/path/to/data1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DataFile dataFile2 = + DataFiles.builder(table.spec()) + .withPath("/path/to/data2.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); + + DeleteFile delete1 = + FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/delete1.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + DeleteFile delete2 = + FileMetadata.deleteFileBuilder(table.spec()) + .ofPositionDeletes() + .withPath("/path/to/delete2.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit(); + + PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); + BatchScan batchScan = positionDeletesTable.newBatchScan(); + List scanTasks = + Lists.newArrayList( + Iterators.transform( + batchScan.planFiles().iterator(), + task -> { + assertThat(task).isInstanceOf(PositionDeletesScanTask.class); + return (PositionDeletesScanTask) task; + })); + + Assert.assertEquals("Expected 2 tasks", 2, scanTasks.size()); + scanTasks.sort(Comparator.comparing(f -> f.file().path().toString())); + Assert.assertEquals("/path/to/delete1.parquet", scanTasks.get(0).file().path().toString()); + Assert.assertEquals("/path/to/delete2.parquet", scanTasks.get(1).file().path().toString()); + + Types.StructType partitionType = Partitioning.partitionType(table); + + Assert.assertEquals( + "/path/to/delete1.parquet", + constantsMap(scanTasks.get(0), partitionType).get(MetadataColumns.FILE_PATH.fieldId())); + Assert.assertEquals( + "/path/to/delete2.parquet", + constantsMap(scanTasks.get(1), partitionType).get(MetadataColumns.FILE_PATH.fieldId())); + + Assert.assertEquals( + 1, constantsMap(scanTasks.get(0), partitionType).get(MetadataColumns.SPEC_ID.fieldId())); + Assert.assertEquals( + 1, constantsMap(scanTasks.get(1), partitionType).get(MetadataColumns.SPEC_ID.fieldId())); + + StructLikeWrapper wrapper = StructLikeWrapper.forType(Partitioning.partitionType(table)); + PartitionData partitionData = + new PartitionData(Partitioning.partitionType(table)); // null part values + StructLikeWrapper expected = wrapper.set(partitionData); + StructLikeWrapper scanTask1Partition = + wrapper.copyFor( + (StructLike) + (constantsMap(scanTasks.get(0), partitionType) + .get(MetadataColumns.PARTITION_COLUMN_ID))); + StructLikeWrapper scanTask2Partition = + wrapper.copyFor( + (StructLike) + (constantsMap(scanTasks.get(1), partitionType) + .get(MetadataColumns.PARTITION_COLUMN_ID))); + + Assert.assertEquals(expected, scanTask1Partition); + Assert.assertEquals(expected, scanTask2Partition); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java index 363df4d37581..b02fad283134 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java @@ -20,15 +20,21 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.stream.Stream; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -157,6 +163,70 @@ public void testPartitionsTableScanWithAddPartitionOnNestedField() throws IOExce } } + @Test + public void testPositionDeletesPartitionSpecRemoval() { + Assume.assumeTrue("Position deletes supported only for v2 tables", formatVersion == 2); + + table.updateSpec().removeField("id").commit(); + + DeleteFile deleteFile = newDeleteFile(table.ops().current().spec().specId(), "nested.id=1"); + table.newRowDelta().addDeletes(deleteFile).commit(); + + PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); + + Expression expression = + Expressions.and( + Expressions.equal("partition.nested.id", 1), Expressions.greaterThan("pos", 0)); + BatchScan scan = positionDeletesTable.newBatchScan().filter(expression); + + assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class); + + List tasks = Lists.newArrayList(scan.planFiles()); + assertThat(tasks).hasSize(1); + + ScanTask task = tasks.get(0); + assertThat(task).isInstanceOf(PositionDeletesScanTask.class); + + Types.StructType partitionType = Partitioning.partitionType(table); + PositionDeletesScanTask posDeleteTask = (PositionDeletesScanTask) task; + + Assert.assertEquals( + "Expected correct partition on task", + 1, + (int) posDeleteTask.file().partition().get(0, Integer.class)); + + // Constant partition struct is common struct that includes even deleted partition column + Assert.assertEquals( + "Expected correct partition on constant column", + 1, + (int) + ((StructLike) + constantsMap(posDeleteTask, partitionType) + .get(MetadataColumns.PARTITION_COLUMN_ID)) + .get(1, Integer.class)); + + Assert.assertEquals( + "Expected correct partition field id on task's spec", + table.ops().current().spec().partitionType().fields().get(0).fieldId(), + posDeleteTask.spec().fields().get(0).fieldId()); + + Assert.assertEquals( + "Expected correct partition spec id on task", + table.ops().current().spec().specId(), + posDeleteTask.file().specId()); + Assert.assertEquals( + "Expected correct partition spec id on constant column", + table.ops().current().spec().specId(), + constantsMap(posDeleteTask, partitionType).get(MetadataColumns.SPEC_ID.fieldId())); + + Assert.assertEquals( + "Expected correct delete file on task", deleteFile.path(), posDeleteTask.file().path()); + Assert.assertEquals( + "Expected correct delete file on constant column", + deleteFile.path(), + constantsMap(posDeleteTask, partitionType).get(MetadataColumns.FILE_PATH.fieldId())); + } + private Stream allRows(Iterable tasks) { return Streams.stream(tasks).flatMap(task -> Streams.stream(task.asDataTask().rows())); } diff --git a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java index 89beb352c1c7..d4a2a08d7fcc 100644 --- a/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java +++ b/core/src/test/java/org/apache/iceberg/TestSplitPlanning.java @@ -234,6 +234,21 @@ public void testSplitPlanningWithOffsetsUnableToSplit() { "We should still only get 2 tasks per file", 32, Iterables.size(scan.planTasks())); } + @Test + public void testBasicSplitPlanningDeleteFiles() { + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + List files128Mb = newDeleteFiles(4, 128 * 1024 * 1024); + appendDeleteFiles(files128Mb); + + PositionDeletesTable posDeletesTable = new PositionDeletesTable(table); + // we expect 4 bins since split size is 128MB and we have 4 files 128MB each + Assert.assertEquals(4, Iterables.size(posDeletesTable.newBatchScan().planTasks())); + List files32Mb = newDeleteFiles(16, 32 * 1024 * 1024); + appendDeleteFiles(files32Mb); + // we expect 8 bins after we add 16 files 32MB each as they will form additional 4 bins + Assert.assertEquals(8, Iterables.size(posDeletesTable.newBatchScan().planTasks())); + } + private void appendFiles(Iterable files) { AppendFiles appendFiles = table.newAppend(); files.forEach(appendFiles::appendFile); @@ -281,4 +296,34 @@ private DataFile newFile(long sizeInBytes, FileFormat fileFormat, int numOffsets return builder.build(); } + + private void appendDeleteFiles(List files) { + RowDelta rowDelta = table.newRowDelta(); + files.forEach(rowDelta::addDeletes); + rowDelta.commit(); + } + + private List newDeleteFiles(int numFiles, long sizeInBytes) { + return newDeleteFiles(numFiles, sizeInBytes, FileFormat.PARQUET); + } + + private List newDeleteFiles(int numFiles, long sizeInBytes, FileFormat fileFormat) { + List files = Lists.newArrayList(); + for (int fileNum = 0; fileNum < numFiles; fileNum++) { + files.add(newDeleteFile(sizeInBytes, fileFormat)); + } + return files; + } + + private DeleteFile newDeleteFile(long sizeInBytes, FileFormat fileFormat) { + String fileName = UUID.randomUUID().toString(); + FileMetadata.Builder builder = + FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofPositionDeletes() + .withPath(fileFormat.addExtension(fileName)) + .withFileSizeInBytes(sizeInBytes) + .withRecordCount(2); + + return builder.build(); + } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java index 9a73ef9e1500..235f7a32b660 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestStaticTable.java @@ -71,11 +71,20 @@ public void testCannotDoIncrementalScanOnMetadataTable() { for (MetadataTableType type : MetadataTableType.values()) { Table staticTable = getStaticTable(type); - AssertHelpers.assertThrows( - "Static tables do not support incremental scans", - UnsupportedOperationException.class, - String.format("Cannot incrementally scan table of type %s", type), - () -> staticTable.newScan().appendsAfter(1)); + + if (type.equals(MetadataTableType.POSITION_DELETES)) { + AssertHelpers.assertThrows( + "POSITION_DELETES table does not support TableScan", + UnsupportedOperationException.class, + "Cannot create TableScan from table of type POSITION_DELETES", + staticTable::newScan); + } else { + AssertHelpers.assertThrows( + "Static tables do not support incremental scans", + UnsupportedOperationException.class, + String.format("Cannot incrementally scan table of type %s", type), + () -> staticTable.newScan().appendsAfter(1)); + } } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java index 284b6c86e542..380ebcac6df6 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java @@ -29,6 +29,9 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.ScanTask; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -157,12 +160,23 @@ private static Table getMetaDataTable(Table table, MetadataTableType type) { private static Set getFiles(Table table) throws IOException { Set files = Sets.newHashSet(); - try (CloseableIterable tasks = table.newScan().planFiles()) { - for (FileScanTask task : tasks) { - files.add(task.file().path()); + if (table instanceof PositionDeletesTable + || (table instanceof SerializableTable.SerializableMetadataTable + && ((SerializableTable.SerializableMetadataTable) table) + .type() + .equals(MetadataTableType.POSITION_DELETES))) { + try (CloseableIterable tasks = table.newBatchScan().planFiles()) { + for (ScanTask task : tasks) { + files.add(((PositionDeletesScanTask) task).file().path()); + } + } + } else { + try (CloseableIterable tasks = table.newScan().planFiles()) { + for (FileScanTask task : tasks) { + files.add(task.file().path()); + } } } - return files; }