Skip to content

Commit

Permalink
More review comments, use existing MetadataColumn ids
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Jan 19, 2023
1 parent e2c03aa commit c326236
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 155 deletions.
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ default StructLike partition() {
return file().partition();
}

@Override
default long sizeBytes() {
return length();
}

/**
* The starting position of this scan range in the file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,15 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;

import java.util.Map;
import java.util.function.BiFunction;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;

/** Base implememntation of {@link PositionDeletesScanTask} */
class BasePositionDeletesScanTask extends BaseContentScanTask<PositionDeletesScanTask, DeleteFile>
implements PositionDeletesScanTask, SplittableScanTask<PositionDeletesScanTask> {

private final Types.StructType partitionType;

BasePositionDeletesScanTask(
DeleteFile file,
String schemaString,
String specString,
ResidualEvaluator evaluator,
Types.StructType partitionType) {
DeleteFile file, String schemaString, String specString, ResidualEvaluator evaluator) {
super(file, schemaString, specString, evaluator);
this.partitionType = partitionType;
}

@Override
public long sizeBytes() {
return length();
}

@Override
Expand All @@ -61,40 +39,4 @@ protected PositionDeletesScanTask newSplitTask(
PositionDeletesScanTask parentTask, long offset, long length) {
return new SplitPositionDeletesScanTask(parentTask, offset, length);
}

@Override
public Map<Integer, ?> constantsMap() {
return constantsMap((type, constant) -> constant);
}

@Override
public Map<Integer, ?> constantsMap(BiFunction<Type, Object, Object> convertConstant) {
Map<Integer, Object> idToConstant = Maps.newHashMap();
StructLike partitionData = file().partition();

idToConstant.put(
POSITION_DELETE_TABLE_FILE_PATH_ID,
convertConstant.apply(Types.StringType.get(), file().path()));

// add spec_id constant column
idToConstant.put(
POSITION_DELETE_TABLE_SPEC_ID,
convertConstant.apply(Types.IntegerType.get(), file().specId()));

// add partition constant column
if (partitionType != null) {
if (partitionType.fields().size() > 0) {
StructLike coercedPartition =
PartitionUtil.coercePartition(partitionType, spec(), partitionData);
idToConstant.put(
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
convertConstant.apply(partitionType, coercedPartition));
} else {
// use null as some query engines may not be able to handle empty structs
idToConstant.put(POSITION_DELETE_TABLE_PARTITION_FIELD_ID, null);
}
}

return idToConstant;
}
}
4 changes: 0 additions & 4 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ private MetadataColumns() {}
Types.LongType.get(),
"Commit snapshot ID");

public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 107;
public static final int POSITION_DELETE_TABLE_SPEC_ID = Integer.MAX_VALUE - 108;
public static final int POSITION_DELETE_TABLE_FILE_PATH_ID = Integer.MAX_VALUE - 109;

private static final Map<String, NestedField> META_COLUMNS =
ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,5 @@
*/
package org.apache.iceberg;

import java.util.Map;
import java.util.function.BiFunction;
import org.apache.iceberg.types.Type;

/** A {@link ScanTask} for position delete files */
public interface PositionDeletesScanTask extends ContentScanTask<DeleteFile> {

/**
* Utility method to get constant values of rows to be scanned by this task. The following columns
* are constants of each PositionDeletesFileScanTask: * spec_id * partition * delete_file_path
*
* @return a map of column id to constant values returned by this task type
*/
Map<Integer, ?> constantsMap();

/**
* Utility method to get constant values of rows to be scanned by this task. The following columns
* are constants of each PositionDeletesFileScanTask: * spec_id * partition * delete_file_path
*
* @param convertConstant callback to convert from an Iceberg typed object to an engine specific
* object
* @return a map of column id to constant values returned by this task type
*/
Map<Integer, ?> constantsMap(BiFunction<Type, Object, Object> convertConstant);
}
public interface PositionDeletesScanTask extends ContentScanTask<DeleteFile> {}
34 changes: 13 additions & 21 deletions core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.List;
Expand Down Expand Up @@ -89,27 +85,19 @@ private Schema calculateSchema() {
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC),
Types.NestedField.required(
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
MetadataColumns.PARTITION_COLUMN_ID,
"partition",
partitionType,
"Partition that position delete row belongs to"),
Types.NestedField.required(
POSITION_DELETE_TABLE_SPEC_ID,
"spec_id",
Types.IntegerType.get(),
"Spec ID of the file that the position delete row belongs to"),
Types.NestedField.required(
POSITION_DELETE_TABLE_FILE_PATH_ID,
"delete_file_path",
Types.StringType.get(),
"Path of the delete file that the position delete row belongs to"));
MetadataColumns.SPEC_ID,
MetadataColumns.FILE_PATH);

if (partitionType.fields().size() > 0) {
return result;
} else {
// avoid returning an empty struct, which is not always supported.
// instead, drop the partition field
return TypeUtil.selectNot(result, Sets.newHashSet(POSITION_DELETE_TABLE_PARTITION_FIELD_ID));
return TypeUtil.selectNot(result, Sets.newHashSet(MetadataColumns.PARTITION_COLUMN_ID));
}
}

Expand All @@ -136,6 +124,7 @@ public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
}

@Override
protected List<String> scanColumns() {
return context().returnColumnStats() ? DELETE_SCAN_WITH_STATS_COLUMNS : DELETE_SCAN_COLUMNS;
}
Expand Down Expand Up @@ -169,12 +158,12 @@ protected CloseableIterable<ScanTask> doPlanFiles() {

// iterate through delete manifests
CloseableIterable<ManifestFile> deleteManifests =
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
CloseableIterable.withNoopClose(snapshot().deleteManifests(table().io()));
CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
deleteManifests,
manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
CloseableIterable<CloseableIterable<ScanTask>> results =
Iterable<CloseableIterable<ScanTask>> results =
CloseableIterable.transform(
filteredManifests,
manifest -> {
Expand Down Expand Up @@ -202,12 +191,15 @@ protected CloseableIterable<ScanTask> doPlanFiles() {
entry.file().copy(context().returnColumnStats()),
schemaString,
specStringCache.get(specId),
residualCache.get(specId),
partitionType);
residualCache.get(specId));
});
});

return new ParallelIterable<>(results, planExecutor());
if (planExecutor() != null) {
return new ParallelIterable<>(results, planExecutor());
} else {
return CloseableIterable.concat(results);
}
}

private <T> LoadingCache<Integer, T> partitionCacheOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private FileIO fileIO(Table table) {
return table.io();
}

protected Table lazyTable() {
private Table lazyTable() {
if (lazyTable == null) {
synchronized (this) {
if (lazyTable == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,22 @@
*/
package org.apache.iceberg;

import java.util.Map;
import java.util.function.BiFunction;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.types.Type;

/** A split of a {@link PositionDeletesScanTask} that is mergeable. */
class SplitPositionDeletesScanTask
implements PositionDeletesScanTask, MergeableScanTask<PositionDeletesScanTask> {

private final PositionDeletesScanTask parentTask;
private final long offset;
private final long len;
private final long length;

protected SplitPositionDeletesScanTask(
PositionDeletesScanTask parentTask, long offset, long length) {
this.parentTask = parentTask;
this.offset = offset;
this.len = length;
}

protected PositionDeletesScanTask parentTask() {
return parentTask;
}

@Override
public long sizeBytes() {
return length();
this.length = length;
}

@Override
Expand All @@ -65,29 +53,19 @@ public long start() {

@Override
public long length() {
return len;
return length;
}

@Override
public Expression residual() {
return parentTask.residual();
}

@Override
public Map<Integer, ?> constantsMap() {
return parentTask.constantsMap();
}

@Override
public Map<Integer, ?> constantsMap(BiFunction<Type, Object, Object> convertConstant) {
return parentTask.constantsMap(convertConstant);
}

@Override
public boolean canMerge(org.apache.iceberg.ScanTask other) {
if (other instanceof SplitPositionDeletesScanTask) {
SplitPositionDeletesScanTask that = (SplitPositionDeletesScanTask) other;
return file().equals(that.file()) && offset + len == that.start();
return file().equals(that.file()) && offset + length == that.start();
} else {
return false;
}
Expand All @@ -96,7 +74,7 @@ public boolean canMerge(org.apache.iceberg.ScanTask other) {
@Override
public SplitPositionDeletesScanTask merge(org.apache.iceberg.ScanTask other) {
SplitPositionDeletesScanTask that = (SplitPositionDeletesScanTask) other;
return new SplitPositionDeletesScanTask(parentTask, offset, len + that.length());
return new SplitPositionDeletesScanTask(parentTask, offset, length + that.length());
}

@Override
Expand All @@ -105,7 +83,7 @@ public String toString() {
.add("file", file().path())
.add("partition_data", file().partition())
.add("offset", offset)
.add("length", len)
.add("length", length)
.add("residual", residual())
.toString();
}
Expand Down
27 changes: 15 additions & 12 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_FILE_PATH_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_PARTITION_FIELD_ID;
import static org.apache.iceberg.MetadataColumns.POSITION_DELETE_TABLE_SPEC_ID;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -45,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -1082,22 +1080,23 @@ public void testPositionDeletesWithFilter() {
1,
(int)
((StructLike)
posDeleteTask.constantsMap().get(POSITION_DELETE_TABLE_PARTITION_FIELD_ID))
PartitionUtil.constantsMap(posDeleteTask)
.get(MetadataColumns.PARTITION_COLUMN_ID))
.get(0, Integer.class));

Assert.assertEquals(
"Expected correct partition spec id on task", 0, posDeleteTask.file().specId());
Assert.assertEquals(
"Expected correct partition spec id on constant column",
0,
posDeleteTask.constantsMap().get(POSITION_DELETE_TABLE_SPEC_ID));
(PartitionUtil.constantsMap(posDeleteTask).get(MetadataColumns.SPEC_ID.fieldId())));

Assert.assertEquals(
"Expected correct delete file on task", FILE_B_DELETES.path(), posDeleteTask.file().path());
Assert.assertEquals(
"Expected correct delete file on constant column",
FILE_B_DELETES.path(),
posDeleteTask.constantsMap().get(POSITION_DELETE_TABLE_FILE_PATH_ID));
(PartitionUtil.constantsMap(posDeleteTask).get(MetadataColumns.FILE_PATH.fieldId())));
}

@Test
Expand Down Expand Up @@ -1186,13 +1185,15 @@ public void testPositionDeletesUnpartitioned() {

Assert.assertEquals(
"/path/to/delete1.parquet",
scanTasks.get(0).constantsMap().get(POSITION_DELETE_TABLE_FILE_PATH_ID));
PartitionUtil.constantsMap(scanTasks.get(0)).get(MetadataColumns.FILE_PATH.fieldId()));
Assert.assertEquals(
"/path/to/delete2.parquet",
scanTasks.get(1).constantsMap().get(POSITION_DELETE_TABLE_FILE_PATH_ID));
PartitionUtil.constantsMap(scanTasks.get(1)).get(MetadataColumns.FILE_PATH.fieldId()));

Assert.assertEquals(1, scanTasks.get(0).constantsMap().get(POSITION_DELETE_TABLE_SPEC_ID));
Assert.assertEquals(1, scanTasks.get(1).constantsMap().get(POSITION_DELETE_TABLE_SPEC_ID));
Assert.assertEquals(
1, PartitionUtil.constantsMap(scanTasks.get(0)).get(MetadataColumns.SPEC_ID.fieldId()));
Assert.assertEquals(
1, PartitionUtil.constantsMap(scanTasks.get(1)).get(MetadataColumns.SPEC_ID.fieldId()));

StructLikeWrapper wrapper = StructLikeWrapper.forType(Partitioning.partitionType(table));
PartitionData partitionData =
Expand All @@ -1201,11 +1202,13 @@ public void testPositionDeletesUnpartitioned() {
StructLikeWrapper scanTask1Partition =
wrapper.copyFor(
(StructLike)
(scanTasks.get(0).constantsMap().get(POSITION_DELETE_TABLE_PARTITION_FIELD_ID)));
(PartitionUtil.constantsMap(scanTasks.get(0))
.get(MetadataColumns.PARTITION_COLUMN_ID)));
StructLikeWrapper scanTask2Partition =
wrapper.copyFor(
(StructLike)
(scanTasks.get(1).constantsMap().get(POSITION_DELETE_TABLE_PARTITION_FIELD_ID)));
(PartitionUtil.constantsMap(scanTasks.get(1))
.get(MetadataColumns.PARTITION_COLUMN_ID)));

Assert.assertEquals(expected, scanTask1Partition);
Assert.assertEquals(expected, scanTask2Partition);
Expand Down
Loading

0 comments on commit c326236

Please sign in to comment.