Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Alternate implementation of using snapshot schema when reading snapshot #3314

Closed
wants to merge 13 commits into from

Conversation

wypoon
Copy link
Contributor

@wypoon wypoon commented Oct 19, 2021

This is an alternate approach to #1508 based on #3269. This handles the support for Spark 3.

@wypoon wypoon changed the title [POC] Use snapshot schema when reading snapshot [POC] Alternate implementation of using snapshot schema when reading snapshot Oct 19, 2021
wypoon and others added 12 commits October 19, 2021 09:38
According to Edwin Choi, in order to get the schema for a snapshot,
the only safe option is to scan the metadata files to find the one
where the current-snapshot-id matches target snapshot id.
The changes are mostly in spark3. They are necessitated by the catalog
support introduced in apache#1783.
As the spark3 IcebergSource now implements SupportsCatalogOptions,
DataFrameReader#load no longer calls IcebergSource#getTable but calls
SparkCatalog#loadTable directly. In order for the SparkTable returned by
SparkCatalog#loadTable(Identifier) to be aware of the snapshot, the
information about the snapshot needs to be present in the Identifier.
For this reason, we introduce a SnapshotAwareIdentifier interface
extending Identifier.
As SupportsCatalogOptions does not allow a schema to be specified
(requested), SparkTable no longer needs a requestedSchema field, so
some dead code is removed from it.
Rebased on master.
Use constants from SparkReadOptions.
Implement snapshotSchema() in SparkFilesScan as it extends SparkBatchScan.
Avoid introducing new methods to BaseTable.
Add helper methods to SnapshotUtil instead.
Move recovery of schema from previous metadata files in the event
that snapshot does not have associated schema id to new PR.
Remove snapshotSchema method from SparkBatchScam and its subclasses,
as it is not needed.
Adjust schema in BaseTableScan when useSnapshot is called.
Use the existing CatalogAndIdentifier and swap out the Identifier for a
snapshot-aware TableIdentifier if snapshotId or asOfTimestamp is set.
Fix a bug in BaseTableScan#useSnapshot.
Some clean up in SnapshotUtil.
Some streamlining in added unit tests.
Refactor spark2 Reader to configure the TableScan on construction,
and let the TableScan get the schema for the snapshot.
Rename new TableIdentifier to SparkTableIdentifier to avoid confusion
with existing TableIdentifier (in different package).
Add convenience constructor to PathIdentifier to avoid modifying tests
for it.
…ableScan.

Use SnapshotUtil.snapshotIdAsOfTime in BaseTableScan#asOfTime.
Move formatTimestampMillis from BaseTableScan to SnapshotUtil in order to
use it there (BaseTableScan is a package-private and not a public class).
Fix some error messages.
Incorporate the approach shown in apache#3269 by Ryan Blue.
That defines a syntax for selecting a snapshot or timestamp through
the table name. Use that instead of a SnapshotAwareIdentifier to
load the SparkTable.
@wypoon wypoon force-pushed the schema-for-snapshot2 branch from f7fadf3 to 129cd44 Compare October 19, 2021 16:40
Removed test left two unused imports.
@github-actions github-actions bot added the flink label Oct 19, 2021
CatalogManager catalogManager = spark.sessionState().catalogManager();

if (path.contains("/")) {
// contains a path. Return iceberg default catalog and a PathIdentifier
String newPath = selector.equals("") ? path : path + "#" + selector;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one area I'm not sure about. I am not too familiar with Hadoop tables and PathIdentifier. Is the only thing that can go after # the name of a metadata table (and now the snapshot/timestamp selector)?

Comment on lines +293 to +302
// If the table is loaded using the Spark DataFrame API, and option("snapshot-id", <snapshot_id>)
// or option("as-of-timestamp", <timestamp>) is applied to the DataFrameReader, SparkTable will be
// constructed with a non-null snapshotId. Subsequently SparkTable#newScanBuilder will be called
// with the options, which will include "snapshot-id" or "as-of-timestamp".
// On the other hand, if the table is loaded using SQL, with the table suffixed with a snapshot
// or timestamp selector, then SparkTable will be constructed with a non-null snapshotId, but
// SparkTable#newScanBuilder will be called without the "snapshot-id" or "as-of-timestamp" option.
// We therefore add a "snapshot-id" option here in this latter case.
// As a consistency check, if "snapshot-id" is in the options, the id must match what we already
// have.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This took me some figuring out. We only need to add snapshot id if the SparkTable is loaded from SQL via the table name syntax, not when loaded using the DataFrame API.

@wypoon
Copy link
Contributor Author

wypoon commented Feb 2, 2022

Obsolete. Superseded by #3722 which is merged.

@wypoon wypoon closed this Feb 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant