-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Support time travel through table names #3269
Conversation
@wypoon, this is a prototype of how we can cleanly pass snapshot and timestamp selectors through identifiers. Could you take a look? |
In Hive we enabled this in HIVE-25344:
Having different ways to do time travel with different engines will cause confusion for the users. Would it be good to keep the standard SQL syntax as we did in Hive? |
@pvary, that would be great. But Spark is unlikely to add that syntax any time soon. I proposed it years ago and had people push back against it. I think it is more important to support time travel in SQL soon than to keep SQL syntax uniform. I know that would be ideal, though. |
I agree with @rdblue that we should allow time travel query using table name. It's not likely for |
@@ -80,6 +87,9 @@ | |||
*/ | |||
public class SparkCatalog extends BaseCatalog { | |||
private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); | |||
private static final Splitter COMMA = Splitter.on(","); | |||
private static final Pattern AT_TIME = Pattern.compile("at(?:_(?:time(?:stamp)?)?)?_?(\\d+)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the reason for supporting so many different ways to specify the time and snapshot id? My opinion is we should just support 1 shortcut and 1 full name for each, for example at_
and at_timestamp_
for timestamp travel, s_
and snapshot_id_
for snapshot ID travel. Maybe even the full name ones are not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have gone a bit too far here. I think that we want to be able to use the full "at_timestamp_12938471" version because that is the least likely to conflict with existing table names. Similarly, I think it is valuable to have a short version, like at_<timestamp>
and snap_<id>
so you don't have to type at_timestamp_
or the full snapshot_id_
every time. Since we were already testing multiple prefixes, I added a few that I thought would be valuable to avoid confusion:
- Make the final
_
optional because that's an easy mistake to make - Allow
time
and not justtimestamp
because people may not remember - Allow omitting
_id
and allow justsnap_
ors_
as shortened forms
The logic made sense at every step but there are quite a few variations. I'm up for defining the full version and one shortening if you think it's best to have just a couple.
|
||
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) { | ||
// if the original load didn't work, the identifier may be extended and include a snapshot selector | ||
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are also using .
as the deliminator for table names, similar to how system tables are parsed today. However, system table parsing logic is in core, should we also move this logic to core so all engines obey the same table name format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I debated that as well. I'm not sure where it would be most clean to put this, which is why I added it just to Spark for now. I think we will also want to extend this to work for branch or tag names, at which point we may want to reconsider moving everything into core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot one other thing. This is a more reasonable change if we do it in Spark because SparkTable
is a pretty thin wrapper around Iceberg's Table
. If we do this in core, then we would have to update BaseTable
to select a version -- which is pretty strange with the API -- or to introduce a wrapper table type that only allows reading. So since we already have the wrapper table it seemed easiest just to update that for now and not over-complicate the core Table API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to support the timestamp as part of the TableIdentifier
as that would align better with what we're planning to do anyway for Snapshot Branching/Tagging, where the TableIdentifier
needs to understand what reference we're currently at. Similarly, it would also understand the timestamp.
I'll update https://docs.google.com/document/d/1KSgkVYnIMlWEbAT1qSnnnLS-gc0kdgHlWR6Ud08HOhA/edit#heading=h.zf5ulr1b1ytv to also respect timestamps
FWIW, Hive and Impala added this feature for Iceberg table using the standard format. I understand the spacial way Spark handles features. What other engines do have problems with implementing the standard and need to use the dot based naming format?
For Hive it was much harder to add the metadata table support, where the metadata table name is delimited by a dot after the real table name. I was always envious of the Spark SQL extension feature. Could we add our own extension to implement time travel with format defined by the standard? I fear that if we start using If we must use identifier names to express these features I would prefer to use something specific for the feature. Like |
I agree with @pvary that it would be better if Spark supported the same SQL syntax for time travel queries as Hive and Impala, especially if the syntax in question is the SQL standard. I understand the difficulty of getting the support into Spark itself. Is it not possible to implement the support using SQL extensions in Iceberg? Orthogonal to that, I think that metadata tables are conceptually different from snapshots, so while I think something like |
@huaxingao was kind enough to look into supporting the AS OF syntax in Spark. |
@aokolnychyi @huaxingao do you mean in the Spark project or in the Iceberg project in spark3-extensions? |
As for the suggestion to use |
Yeah, the problem with using |
I am communicating with the Spark folks to see if I can add AS OF syntax in Spark project. Still waiting for their reply. |
Incorporate the approach shown in apache#3269 by Ryan Blue. That defines a syntax for selecting a snapshot or timestamp through the table name. Use that instead of a SnapshotAwareIdentifier to load the SparkTable.
Incorporate the approach shown in apache#3269 by Ryan Blue. That defines a syntax for selecting a snapshot or timestamp through the table name. Use that instead of a SnapshotAwareIdentifier to load the SparkTable.
@@ -21,6 +21,7 @@ | |||
|
|||
import java.util.Map; | |||
import java.util.Set; | |||
import org.apache.arrow.util.Preconditions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to use this instead of com.google.common.base.Preconditions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. And this shouldn't be allowed. We have a rule that checks for the wrong one in imports. Looks like we now need to add the Arrow package.
@huaxingao, any update on getting AS OF syntax in Spark? |
@rdblue |
@rdblue
|
@huaxingao please confirm the SQL syntax that is agreed to in Spark (I'm looking at some Databricks blog for Delta Lake time travel):
where @rdblue given that support for AS OF in Spark will only be in a future Spark release, what do you think we can do to support identifying the snapshot (or timestamp) for loading a table in Iceberg now? We need this to support reading the snapshot using the correct schema (as is now done for Spark 2.4). |
@wypoon
I plan to do exactly the same as above. |
I think adding a
Until the change to this interface makes it into a released Spark 3 version, we could add this extension to Iceberg and use this internal extension of |
@wypoon @rdblue @huaxingao was there any consideration around the upcoming feature around branching? Based on the current syntax, it looks like we are assuming a linear history for time travel. Users can only:
There is no way to appoint a different branch to time travel, unless I do something like:
Any thoughts? |
@jackye1995 I have not been following the proposal/project to add snapshot tagging and branching, so I had not considered this question. I only just read the proposal document and your doc PR #3425. I did not see any discussion in them about the semantics of time travel by timestamp. What do you think the semantics should be? One possible option is:
This assumes the history is really a tree. (Am I mistaken about it being a tree?) In addition, we need a syntax for specifying a tag or a branch; I don't know if one is proposed -- I saw above an allusion to using Do you think we need the
I think that if we use the |
@wypoon |
Seems I can't change
|
@huaxingao I'm guessing that if there is an AS OF clause in the SQL query, you will call The problem is that we need to support using the DataFrame API with @rdblue what are your thoughts on this? |
@wypoon I understand the problem, but my proposal to add |
@huaxingao, I agree with the direction to not add a version to Identifier. Adding a method to the catalog that can load different versions is a good idea. If we have two different I agree that VERSION should be passed as a string (hopefully an arbitrary one!) and TIMESTAMP as a long in micros from epoch, just like the other timestamps. |
For the branching and time travel discussion, I think it is valuable to be able to use Time travel by timestamp would then select from the main branch, as was suggested. To support both time travel by timestamp and branches at the same time, I think it is natural to use another level of multi-part identifier, like The reason why I think we should consider supporting a branch name in |
@wypoon and others. For our next release, should we add something like this PR so that we can fix schemas with time travel in Spark 3.0? We don't have to document that we support time travel through table identifiers so that we can remove it later and replace it with the @huaxingao, one more thing to consider on your PR is how to update |
I was thinking about this a bit more and I think that the read options are still passed through. All we should need to do is to standardize the time travel property names and pass the values to the new
I think that should work. What do you think, @huaxingao? |
@rdblue
|
@huaxingao, that looks great to me! You might want to just pass options into |
Sure! Sounds good to me. |
It is still strange for me why deviate from the standard SQL for time travel when we developing a new feature, but I heard about the Spark community, and I can accept that we have constraints there. If we do not have the same constraints for handling branches I would go for having Thanks, |
@rdblue @huaxingao I agree with your core suggestion:
May I suggest "as-of-version" and "as-of-timestamp" for the options? We already use "as-of-timestamp" in Iceberg. (Side note: In Iceberg documentation, the "as-of-timestamp" option is shown being used with a String rather than a long value, but we actually use the value as a long.) I support being able to use a tag instead of the snapshot-id for the value of VERSION AS OF. But I'm less convinced of the value of being able to use a branch. Perhaps it is enough to support using the branch as part of the table identifier (whether delimited by An orthogonal question for @huaxingao: is it possible to have FOR SYSTEM_TIME AS OF as a synonym for TIMESTAMP AS OF and FOR SYSTEM_VERSION AS OF as a synonym for VERSION AS OF in Spark SQL? While they are not what is used in Delta Lake, I'm just suggesting them as synonyms. This would allow compatibility with Hive and Impala. |
I'm ok with that idea. (You mean Spark 3.0, 3.1 and 3.2, right?) I would like to get the schema for snapshot done for Spark 3! |
I missed that. Can Trino support a new syntax like IN BRANCH? But we might have difficulty to support IN BRANCH in Spark? Can we do it in an extension in Iceberg? |
@wypoon
Spark has a very strict control over adding new SQL syntax. Normally only ANSI standard SQL syntax can be added. I will give it a try and propose adding |
I like the idea to support SYSTEM_TIME and SYSTEM_VERSION for compatibility. That sounds reasonable to me. Do the keywords have underscores in them? If so, is that needed for some reason? The IN BRANCH syntax is fine with me, but it would mean another addition to SparkSQL syntax and I'm not confident that it would be accepted. I also think that it's more natural to use |
Probably. I think it's more likely that Trino supports that than multi-part identifiers, but I'm not sure if they would be open to it. |
@huaxingao, can we support both |
@huaxingao thanks. For the synonym proposal, you can cite HIVE-25344. @pvary can you comment on the question about the underscores in SYSTEM_TIME and SYSTEM_VERSION? |
@rdblue Yes, I will try to add |
AFAIK the SQL 11 standard defines them this way: https://sigmodrecord.org/?smd_process_download=1&download_id=3338 |
@rdblue do you intend to go forward with some form of this PR as a way to enable implementing schema for snapshot for Spark 3 (until what Huaxin proposes is implemented in a released Spark version)? Do we have to wait for a decision on @nastra's proposal in https://docs.google.com/document/d/1KSgkVYnIMlWEbAT1qSnnnLS-gc0kdgHlWR6Ud08HOhA/edit# to change |
This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in apache#3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new Spark 3 is released with support for AS OF in Spark SQL. Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.
This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in apache#3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new Spark 3 is released with support for AS OF in Spark SQL. Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.
This has been implemented for Spark 2. For Spark 3, Ryan Blue proposed a syntax for adding the snapshot id or timestamp to the table identifier in apache#3269. Here we implement the Spark 3 support for using the snapshot schema by using the proposed table identifier syntax. This is until a new Spark 3 is released with support for AS OF in Spark SQL. Note: The table identifier syntax is for internal use only (as in this implementation) and not meant to be exposed as a publicly supported syntax in SQL. However, for testing, we do test its use from SQL.
This was replaced by #3722 so I'll close this. Thanks for the great discussion and for all the work on this problem, everyone! |
This adds support to Spark for time travel queries using extended table names, like
db.table.snapshot_19082734
ordb.table.at_109872341
. This is in support of #1508, which adds a more narrow feature to pass snapshot selectors through identifiers.