Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho committed Dec 21, 2022
1 parent da54adb commit fb6faab
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 144 deletions.
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.expressions.Binder;
Expand Down Expand Up @@ -78,7 +79,7 @@ protected TableOperations tableOps() {
return ops;
}

protected Table table() {
public Table table() {
return table;
}

Expand All @@ -90,6 +91,10 @@ protected TableScanContext context() {
return context;
}

protected Map<String, String> options() {
return context().options();
}

protected List<String> scanColumns() {
return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS;
}
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.LoggerFactory;

/** Base class for {@link TableScan} implementations. */
abstract class BaseTableScan extends AbstractTableScan<TableScan, FileScanTask, CombinedScanTask>
abstract class BaseTableScan extends SnapshotScan<TableScan, FileScanTask, CombinedScanTask>
implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);
private ScanMetrics scanMetrics;
Expand All @@ -39,16 +39,6 @@ protected BaseTableScan(
super(ops, table, schema, context);
}

@Override
public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

@Override
public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

/**
* @return whether column stats are returned.
* @deprecated Will be removed in 1.2.0, use {@link TableScanContext#returnColumnStats()}
Expand All @@ -59,6 +49,16 @@ protected boolean colStats() {
return context().returnColumnStats();
}

@Override
public TableScan appendsBetween(long fromSnapshotId, long toSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

@Override
public TableScan appendsAfter(long fromSnapshotId) {
throw new UnsupportedOperationException("Incremental scan is not supported");
}

@Override
public CloseableIterable<CombinedScanTask> planTasks() {
CloseableIterable<FileScanTask> fileScanTasks = planFiles();
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,8 @@ private MetadataColumns() {}
ROW_POSITION.fieldId(),
IS_DELETED.fieldId(),
SPEC_ID.fieldId(),
PARTITION_COLUMN_ID,
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
POSITION_DELETE_TABLE_SPEC_ID,
POSITION_DELETE_TABLE_FILE_PATH);
PARTITION_COLUMN_ID);

/**
* Returns ids of all known constant and metadata columns (to be avoided when projecting from
* content files)
*/
public static Set<Integer> metadataFieldIds() {
return META_IDS;
}
Expand Down
84 changes: 45 additions & 39 deletions core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,25 @@
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.TableScanUtil;

public class PositionDeletesTable extends BaseTable {
public class PositionDeletesTable extends BaseMetadataTable {

private final Table table;
private final Schema schema;

PositionDeletesTable(TableOperations ops, Table table) {
super(ops, table.name() + ".position_deletes");
this.table = table;
super(ops, table, table.name() + ".position_deletes");
this.schema = calculateSchema();
}

PositionDeletesTable(TableOperations ops, Table table, String name) {
super(ops, name);
this.table = table;
super(ops, table, name);
this.schema = calculateSchema();
}

protected Table table() {
return table;
@Override
MetadataTableType metadataTableType() {
return MetadataTableType.POSITION_DELETES;
}

@Override
Expand All @@ -70,18 +72,19 @@ public BatchScan newBatchScan() {

@Override
public Schema schema() {
return PositionDeletesTable.schema(table(), Partitioning.partitionType(table()));
return schema;
}

public static Schema schema(Table table, Types.StructType partitionType) {
private Schema calculateSchema() {
Types.StructType partitionType = Partitioning.partitionType(table());
Schema result =
new Schema(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
"row",
table.schema().asStruct(),
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC),
Types.NestedField.required(
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
Expand Down Expand Up @@ -109,9 +112,7 @@ public static Schema schema(Table table, Types.StructType partitionType) {
}

public static class PositionDeletesTableScan
extends AbstractTableScan<
BatchScan, org.apache.iceberg.ScanTask, ScanTaskGroup<org.apache.iceberg.ScanTask>>
implements BatchScan {
extends SnapshotScan<BatchScan, ScanTask, ScanTaskGroup<ScanTask>> implements BatchScan {

protected PositionDeletesTableScan(TableOperations ops, Table table, Schema schema) {
super(ops, table, schema, new TableScanContext());
Expand All @@ -129,7 +130,14 @@ protected PositionDeletesTableScan newRefinedScan(
}

@Override
protected CloseableIterable<org.apache.iceberg.ScanTask> doPlanFiles() {
public CloseableIterable<ScanTaskGroup<ScanTask>> planTasks() {
CloseableIterable<ScanTask> scanTasks = planFiles();
return TableScanUtil.planTaskGroups(
scanTasks, targetSplitSize(), splitLookback(), splitOpenFileCost());
}

@Override
protected CloseableIterable<ScanTask> doPlanFiles() {
Expression rowFilter = context().rowFilter();
String schemaString = SchemaParser.toJson(tableSchema());

Expand All @@ -143,7 +151,7 @@ protected CloseableIterable<org.apache.iceberg.ScanTask> doPlanFiles() {

CloseableIterable<ManifestFile> deleteManifests =
CloseableIterable.withNoopClose(snapshot().deleteManifests(tableOps().io()));
CloseableIterable<CloseableIterable<org.apache.iceberg.ScanTask>> results =
CloseableIterable<CloseableIterable<ScanTask>> results =
CloseableIterable.transform(
deleteManifests,
m -> {
Expand All @@ -167,12 +175,11 @@ protected CloseableIterable<org.apache.iceberg.ScanTask> doPlanFiles() {
entry -> {
PartitionSpec spec = transformedSpecs.get(entry.file().specId());
String specString = PartitionSpecParser.toJson(spec);
return new ScanTask(
entry.file(),
return new PositionDeleteScanTask(
entry.file().copy(),
schemaString,
specString,
ResidualEvaluator.of(
spec, Expressions.alwaysTrue(), context().caseSensitive()),
ResidualEvaluator.of(spec, Expressions.alwaysTrue(), isCaseSensitive()),
partitionType);
});
});
Expand All @@ -182,14 +189,15 @@ protected CloseableIterable<org.apache.iceberg.ScanTask> doPlanFiles() {
}

/** Scan task for position delete files */
public static class ScanTask extends BaseContentScanTask<ScanTask, DeleteFile> {
public static class PositionDeleteScanTask
extends BaseContentScanTask<PositionDeleteScanTask, DeleteFile> {

private final String schemaString;
private final String specString;
private final ResidualEvaluator evaluator;
private final Types.StructType partitionType;

public ScanTask(
public PositionDeleteScanTask(
DeleteFile file,
String schemaString,
String specString,
Expand Down Expand Up @@ -224,13 +232,14 @@ Types.StructType partitionType() {
}

@Override
protected ScanTask self() {
protected PositionDeleteScanTask self() {
return this;
}

@Override
protected ScanTask newSplitTask(ScanTask parentTask, long offset, long length) {
return new SplitScanTask(parentTask, offset, length);
protected PositionDeleteScanTask newSplitTask(
PositionDeleteScanTask parentTask, long offset, long length) {
return new SplitPositionDeleteScanTask(parentTask, offset, length);
}

/**
Expand Down Expand Up @@ -284,14 +293,15 @@ protected ScanTask newSplitTask(ScanTask parentTask, long offset, long length) {
}
}

public static class SplitScanTask extends ScanTask
implements ContentScanTask<DeleteFile>, MergeableScanTask<SplitScanTask> {
public static class SplitPositionDeleteScanTask extends PositionDeleteScanTask
implements ContentScanTask<DeleteFile>, MergeableScanTask<SplitPositionDeleteScanTask> {

private final ScanTask parentTask;
private final PositionDeleteScanTask parentTask;
private final long offset;
private final long len;

protected SplitScanTask(ScanTask parentTask, long offset, long length) {
protected SplitPositionDeleteScanTask(
PositionDeleteScanTask parentTask, long offset, long length) {
super(
parentTask.file(),
parentTask.schemaString(),
Expand All @@ -303,7 +313,7 @@ protected SplitScanTask(ScanTask parentTask, long offset, long length) {
this.len = length;
}

protected ScanTask parentTask() {
protected PositionDeleteScanTask parentTask() {
return parentTask;
}

Expand Down Expand Up @@ -334,18 +344,18 @@ public Expression residual() {

@Override
public boolean canMerge(org.apache.iceberg.ScanTask other) {
if (other instanceof SplitScanTask) {
SplitScanTask that = (SplitScanTask) other;
if (other instanceof SplitPositionDeleteScanTask) {
SplitPositionDeleteScanTask that = (SplitPositionDeleteScanTask) other;
return file().equals(that.file()) && offset + len == that.start();
} else {
return false;
}
}

@Override
public SplitScanTask merge(org.apache.iceberg.ScanTask other) {
SplitScanTask that = (SplitScanTask) other;
return new SplitScanTask(parentTask, offset, len + that.length());
public SplitPositionDeleteScanTask merge(org.apache.iceberg.ScanTask other) {
SplitPositionDeleteScanTask that = (SplitPositionDeleteScanTask) other;
return new SplitPositionDeleteScanTask(parentTask, offset, len + that.length());
}

@Override
Expand All @@ -359,8 +369,4 @@ public String toString() {
.toString();
}
}

final Object writeReplace() {
return SerializableTable.copyOf(this);
}
}
32 changes: 9 additions & 23 deletions core/src/main/java/org/apache/iceberg/SerializableTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ protected SerializableTable(Table table) {
public static Table copyOf(Table table) {
if (table instanceof BaseMetadataTable) {
return new SerializableMetadataTable((BaseMetadataTable) table);
} else if (table instanceof PositionDeletesTable) {
return new SerializablePositionDeletesTable((PositionDeletesTable) table);
} else {
return new SerializableTable(table);
}
Expand Down Expand Up @@ -259,6 +257,11 @@ public TableScan newScan() {
return lazyTable().newScan();
}

@Override
public BatchScan newBatchScan() {
return lazyTable().newBatchScan();
}

@Override
public Snapshot currentSnapshot() {
return lazyTable().currentSnapshot();
Expand Down Expand Up @@ -363,27 +366,6 @@ private String errorMsg(String operation) {
return String.format("Operation %s is not supported after the table is serialized", operation);
}

public static class SerializablePositionDeletesTable extends SerializableTable {

private final String baseTableName;

protected SerializablePositionDeletesTable(PositionDeletesTable deletesTable) {
super(deletesTable);
this.baseTableName = deletesTable.table().name();
}

@Override
protected Table newTable(TableOperations ops, String tableName) {
Table baseTable = new BaseTable(ops, baseTableName);
return new PositionDeletesTable(ops, baseTable, tableName);
}

@Override
public BatchScan newBatchScan() {
return lazyTable().newBatchScan();
}
}

public static class SerializableMetadataTable extends SerializableTable {
private final MetadataTableType type;
private final String baseTableName;
Expand All @@ -398,6 +380,10 @@ protected SerializableMetadataTable(BaseMetadataTable metadataTable) {
protected Table newTable(TableOperations ops, String tableName) {
return MetadataTableUtils.createMetadataTableInstance(ops, baseTableName, tableName, type);
}

public MetadataTableType type() {
return type;
}
}

// captures the current state of a Hadoop configuration in a serializable manner
Expand Down
Loading

0 comments on commit fb6faab

Please sign in to comment.