Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Alternate implementation of using snapshot schema when reading snapshot #3314

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 3 additions & 24 deletions core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@

package org.apache.iceberg;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
Expand All @@ -37,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,8 +44,6 @@
abstract class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

private final TableOperations ops;
private final Table table;
private final Schema schema;
Expand Down Expand Up @@ -132,19 +127,7 @@ public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(context.snapshotId() == null,
"Cannot override snapshot, already set to id=%s", context.snapshotId());

Long lastSnapshotId = null;
for (HistoryEntry logEntry : ops.current().snapshotLog()) {
if (logEntry.timestampMillis() <= timestampMillis) {
lastSnapshotId = logEntry.snapshotId();
}
}

// the snapshot ID could be null if no entries were older than the requested time. in that case,
// there is no valid snapshot to read.
Preconditions.checkArgument(lastSnapshotId != null,
"Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));

return useSnapshot(lastSnapshotId);
return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
}

@Override
Expand Down Expand Up @@ -199,7 +182,7 @@ public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
snapshot.snapshotId(), SnapshotUtil.formatTimestampMillis(snapshot.timestampMillis()),
context.rowFilter());

Listeners.notifyAll(
Expand Down Expand Up @@ -304,8 +287,4 @@ private Schema lazyColumnProjection() {

return schema;
}

private static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;

public class DataTableScan extends BaseTableScan {
Expand Down Expand Up @@ -62,6 +63,15 @@ public TableScan appendsAfter(long fromSnapshotId) {
return appendsBetween(fromSnapshotId, currentSnapshot.snapshotId());
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
// we do not use its return value
super.useSnapshot(scanSnapshotId);
Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
}

@Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new DataTableScan(ops, table, schema, context);
Expand Down
84 changes: 84 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@

package org.apache.iceberg.util;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
Expand All @@ -33,6 +39,8 @@ public class SnapshotUtil {
private SnapshotUtil() {
}

private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

/**
* Returns whether ancestorSnapshotId is an ancestor of snapshotId.
*/
Expand Down Expand Up @@ -144,4 +152,80 @@ public static Snapshot snapshotAfter(Table table, long snapshotId) {
throw new IllegalStateException(
String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId));
}

/**
* Returns the ID of the most recent snapshot for the table as of the timestamp.
*
* @param table a {@link Table}
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the snapshot ID
* @throws IllegalArgumentException when no snapshot is found in the table
* older than the timestamp
*/
public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
Long snapshotId = null;
for (HistoryEntry logEntry : table.history()) {
if (logEntry.timestampMillis() <= timestampMillis) {
snapshotId = logEntry.snapshotId();
}
}

Preconditions.checkArgument(snapshotId != null,
"Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));
return snapshotId;
}

/**
* Returns the schema of the table for the specified snapshot.
*
* @param table a {@link Table}
* @param snapshotId the ID of the snapshot
* @return the schema
*/
public static Schema schemaFor(Table table, long snapshotId) {
Snapshot snapshot = table.snapshot(snapshotId);
Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId);
Integer schemaId = snapshot.schemaId();

// schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot
if (schemaId != null) {
Schema schema = table.schemas().get(schemaId);
Preconditions.checkState(schema != null,
"Cannot find schema with schema id %s", schemaId);
return schema;
}

// TODO: recover the schema by reading previous metadata files
return table.schema();
}

/**
* Convenience method for returning the schema of the table for a snapshot,
* when we have a snapshot id or a timestamp. Only one of them should be specified
* (non-null), or an IllegalArgumentException is thrown.
*
* @param table a {@link Table}
* @param snapshotId the ID of the snapshot
* @param timestampMillis the timestamp in millis since the Unix epoch
* @return the schema
* @throws IllegalArgumentException if both snapshotId and timestampMillis are non-null
*/
public static Schema schemaFor(Table table, Long snapshotId, Long timestampMillis) {
Preconditions.checkArgument(snapshotId == null || timestampMillis == null,
"Cannot use both snapshot id and timestamp to find a schema");

if (snapshotId != null) {
return schemaFor(table, snapshotId);
}

if (timestampMillis != null) {
return schemaFor(table, snapshotIdAsOfTime(table, timestampMillis));
}

return table.schema();
}

public static String formatTimestampMillis(long millis) {
return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
Expand All @@ -49,7 +48,6 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public DataSourceReader createReader(StructType readSchema, DataSourceOptions op

Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
if (readSchema != null) {
// convert() will fail if readSchema contains fields not in table.schema()
SparkSchemaUtil.convert(table.schema(), readSchema);
// convert() will fail if readSchema contains fields not in reader.snapshotSchema()
SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema);
reader.pruneColumns(readSchema);
}

Expand Down
Loading