-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC] Alternate implementation of using snapshot schema when reading snapshot #3314
Conversation
According to Edwin Choi, in order to get the schema for a snapshot, the only safe option is to scan the metadata files to find the one where the current-snapshot-id matches target snapshot id.
The changes are mostly in spark3. They are necessitated by the catalog support introduced in apache#1783. As the spark3 IcebergSource now implements SupportsCatalogOptions, DataFrameReader#load no longer calls IcebergSource#getTable but calls SparkCatalog#loadTable directly. In order for the SparkTable returned by SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the information about the snapshot needs to be present in the Identifier. For this reason, we introduce a SnapshotAwareIdentifier interface extending Identifier. As SupportsCatalogOptions does not allow a schema to be specified (requested), SparkTable no longer needs a requestedSchema field, so some dead code is removed from it.
Rebased on master. Use constants from SparkReadOptions. Implement snapshotSchema() in SparkFilesScan as it extends SparkBatchScan.
Avoid introducing new methods to BaseTable. Add helper methods to SnapshotUtil instead. Move recovery of schema from previous metadata files in the event that snapshot does not have associated schema id to new PR. Remove snapshotSchema method from SparkBatchScam and its subclasses, as it is not needed. Adjust schema in BaseTableScan when useSnapshot is called.
Use the existing CatalogAndIdentifier and swap out the Identifier for a snapshot-aware TableIdentifier if snapshotId or asOfTimestamp is set.
Fix a bug in BaseTableScan#useSnapshot. Some clean up in SnapshotUtil. Some streamlining in added unit tests. Refactor spark2 Reader to configure the TableScan on construction, and let the TableScan get the schema for the snapshot. Rename new TableIdentifier to SparkTableIdentifier to avoid confusion with existing TableIdentifier (in different package). Add convenience constructor to PathIdentifier to avoid modifying tests for it.
…ableScan. Use SnapshotUtil.snapshotIdAsOfTime in BaseTableScan#asOfTime. Move formatTimestampMillis from BaseTableScan to SnapshotUtil in order to use it there (BaseTableScan is a package-private and not a public class). Fix some error messages.
Incorporate the approach shown in apache#3269 by Ryan Blue. That defines a syntax for selecting a snapshot or timestamp through the table name. Use that instead of a SnapshotAwareIdentifier to load the SparkTable.
f7fadf3
to
129cd44
Compare
Removed test left two unused imports.
CatalogManager catalogManager = spark.sessionState().catalogManager(); | ||
|
||
if (path.contains("/")) { | ||
// contains a path. Return iceberg default catalog and a PathIdentifier | ||
String newPath = selector.equals("") ? path : path + "#" + selector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one area I'm not sure about. I am not too familiar with Hadoop tables and PathIdentifier
. Is the only thing that can go after #
the name of a metadata table (and now the snapshot/timestamp selector)?
// If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>) | ||
// or option("as-of-timestamp", <timestamp>) is applied to the DataFrameReader, SparkTable will be | ||
// constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called | ||
// with the options, which will include "snapshot-id" or "as-of-timestamp". | ||
// On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot | ||
// or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but | ||
// SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option. | ||
// We therefore add a "snapshot-id" option here in this latter case. | ||
// As a consistency check, if "snapshot-id" is in the options, the id must match what we already | ||
// have. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This took me some figuring out. We only need to add snapshot id if the SparkTable
is loaded from SQL via the table name syntax, not when loaded using the DataFrame API.
Obsolete. Superseded by #3722 which is merged. |
This is an alternate approach to #1508 based on #3269. This handles the support for Spark 3.