-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Implement RollbackToSnapshotProcedure #1759
Conversation
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestManageSnapshotsProcedures.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestManageSnapshotsProcedures.java
Outdated
Show resolved
Hide resolved
...k3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkExtensionsTestBase.java
Show resolved
Hide resolved
Snapshot secondSnapshot = table.currentSnapshot(); | ||
|
||
List<Object[]> output = sql( | ||
"CALL %s.system.rollback_to_snapshot(snapshot_id => %dL, namespace => '%s', table => '%s')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is %dL
required or could it be %d
? I think because of the casting that was added, it could be %d
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not required, we have testRollbackToSnapshotWithQuotedIdentifiers
for that. I don't think our cast logic is working there, though. I think Spark is smart enough to parse snapshot id as long since the value is too large to be int.
I'll add a test for the casting logic as soon as it applies.
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestManageSnapshotsProcedures.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
return Identifier.of(namespace, nameParts.head()); | ||
} | ||
|
||
private Seq<String> parseMultipartIdentifier(String identifierAsString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even in a private method, I don't think it is a good practice to use Scala Seq
because of compatibility problems. It would be safer to pass this directly into a converter to get a List
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean change the return type to List
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Seq
is not safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated it to return an array.
spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private interface ProcedureBuilder { | ||
Procedure build(TableCatalog catalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange that the builder passes catalog
to build, rather than withTableCatalog
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the overall approach or the method name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that there will be more options that will be passed to the builder, but there is just TableCatalog for now. I would expect this to support cases that don't need a TableCatalog, so it doesn't seem like something that we should tie to the build method. Instead, I'd use a withTableCatalog
method and always build the procedure with build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me rework this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Let me know what you think, @rdblue.
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
Outdated
Show resolved
Hide resolved
...sions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
Show resolved
Hide resolved
long snapshotId = args.getLong(2); | ||
|
||
return modifyIcebergTable(namespace, tableName, table -> { | ||
Snapshot previousSnapshot = table.currentSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to reason about concurrent operations on this table instance if caching is enabled. Here, there is a possibility there will be an operation in between we got previousSnapshot
and before we committed so the output may not be precise. It may be even more important in other procedures later.
@rdblue, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a validation to set requiredCurrentSnapshotId
in the operation. Right now, we always roll back as long as it is an ancestor of the current state. Can be done as a follow-up though.
@Override | ||
public T build() { | ||
return doBuild(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make build
abstract?
This PR implements a procedure to rollback to a given snapshot id.
Resovles #1592.