-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: Add a procedure to generate table changes #6012
Conversation
functions.lit(ChangelogOperation.UPDATE_POSTIMAGE.name())); | ||
|
||
// remove the carry-over rows | ||
Dataset<Row> dfWithoutCarryOver = removeCarryOvers(preImageDf.union(postImageDf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make it optional since it is a heavy operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there another algorithm we can consider that would make it cheaper? Will something like this work?
- Load DELETEs and INSERTs as a DF
- Repartition the DF by primary key, _change_ordinal and locally sort by primary key, _change_ordinal, _operation_type
- Call mapPartitions with a closure that would look at the previous, current and next rows
- If the previous, current, next row keys are different, output the current row as-is
- If the next row key is same, the current row must be DELETE and the next row must be INSERT (if not -> exception)
- If other columns beyond the key are same, it is a copied over row
- Output null if unchanged rows should be ignored
- Output the current row as-is if all rows should be produced
- If other columns beyond key are different, it is an update
- Output the current row as pre-update
- If the previous row key is same as the current one, the current row must be INSERT and the previous row must be DELETE
- If other columns beyond the key are same, it is a copied over row
- Output null if unchanged rows should be ignored
- Output the current row as-is if all rows should be produced
- If other columns beyond key are different, it is an update
- Output the current row as post-update
That would require reading the changes only once, doing a single hash-based shuffle to co-locate rows for the same key and change ordinal, keeping at most 3 rows in memory at a time. Seems fairly cheap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand why we would need the previous row and the next row. If we are iterating over rows, then the current will become the previous, so we should only look forward or backward right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aokolnychyi for the suggestion. Make sense to shuffle once. Agreed with @rdblue, to just look forward should be good, no need to search bidirectionally. Will make the change accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change, could you take a look?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with can pre-compute the required state by just looking ahead.
The test failure is not related. |
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
Let me take a look today. |
functions.lit(ChangelogOperation.UPDATE_POSTIMAGE.name())); | ||
|
||
// remove the carry-over rows | ||
Dataset<Row> dfWithoutCarryOver = removeCarryOvers(preImageDf.union(postImageDf)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there another algorithm we can consider that would make it cheaper? Will something like this work?
- Load DELETEs and INSERTs as a DF
- Repartition the DF by primary key, _change_ordinal and locally sort by primary key, _change_ordinal, _operation_type
- Call mapPartitions with a closure that would look at the previous, current and next rows
- If the previous, current, next row keys are different, output the current row as-is
- If the next row key is same, the current row must be DELETE and the next row must be INSERT (if not -> exception)
- If other columns beyond the key are same, it is a copied over row
- Output null if unchanged rows should be ignored
- Output the current row as-is if all rows should be produced
- If other columns beyond key are different, it is an update
- Output the current row as pre-update
- If the previous row key is same as the current one, the current row must be INSERT and the previous row must be DELETE
- If other columns beyond the key are same, it is a copied over row
- Output null if unchanged rows should be ignored
- Output the current row as-is if all rows should be produced
- If other columns beyond key are different, it is an update
- Output the current row as post-update
That would require reading the changes only once, doing a single hash-based shuffle to co-locate rows for the same key and change ordinal, keeping at most 3 rows in memory at a time. Seems fairly cheap?
ProcedureParameter.optional("table_change_view", DataTypes.StringType), | ||
ProcedureParameter.optional("identifier_columns", DataTypes.StringType), | ||
ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType), | ||
ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit worried about the number of parameters to configure boundaries. What if we replaced all of them with generic options
and would pass those options along when loading DataFrame
? Then instead of determining what snapshots match our timestamp range in the procedure, we would do that when scanning the changelog table. That way, users would be able to use timestamp boundaries not only via procedure but also via DataFrame
. Right now, we only support snapshot ID boundaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to allow Dataframe
to consume timestamp range. Will create a followup PR for that. For this procedure, we still need all these parameter, right? What do you mean by replacing all of them with generic options
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I'd consider having read_options
or options
as a map that would be passed while loading deletes and inserts as DataFrame
. Then users can specify boundaries directly in the map.
We already respect these options from SparkReadOptions
in the changes
table:
// Start snapshot ID used in incremental scans (exclusive)
public static final String START_SNAPSHOT_ID = "start-snapshot-id";
// End snapshot ID used in incremental scans (inclusive)
public static final String END_SNAPSHOT_ID = "end-snapshot-id";
We could add start-timestamp
and end-timestamp
, start-snapshot-id-inclusive
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to options
in the procedure. Will add the timestamp range in another PR.
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
ProcedureParameter.optional("table_change_view", DataTypes.StringType), | ||
ProcedureParameter.optional("identifier_columns", DataTypes.StringType), | ||
ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType), | ||
ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. I'd consider having read_options
or options
as a map that would be passed while loading deletes and inserts as DataFrame
. Then users can specify boundaries directly in the map.
We already respect these options from SparkReadOptions
in the changes
table:
// Start snapshot ID used in incremental scans (exclusive)
public static final String START_SNAPSHOT_ID = "start-snapshot-id";
// End snapshot ID used in incremental scans (inclusive)
public static final String END_SNAPSHOT_ID = "end-snapshot-id";
We could add start-timestamp
and end-timestamp
, start-snapshot-id-inclusive
.
@@ -53,6 +53,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() { | |||
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder); | |||
mapBuilder.put("register_table", RegisterTableProcedure::builder); | |||
mapBuilder.put("publish_changes", PublishChangesProcedure::builder); | |||
mapBuilder.put("generate_changes", GenerateChangesProcedure::builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any alternative names? I am not sure the procedure actually generates changes.
Let's think a bit. It is not bad but I wonder whether we can be a bit more specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about create_change_view
generate_change_view
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other options like register_change_view
create_changelog_view
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
...k/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/GenerateChangesProcedure.java
Outdated
Show resolved
Hide resolved
.filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name())) | ||
.map(df::col) | ||
.toArray(Column[]::new); | ||
return transform(df, repartitionColumns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reused the same changelog iterator for removing carry-over rows only. I think we can optimize it here, for example, using the windows function. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not sure I understand the comment here, don't we have the iterator so we don't need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we only remove carryover rows, without computing the updated rows. The iterator is built for both, it will check if they are updated rows and check if they are carryover rows. The first check is not necessary. We only need the second one to see if two rows are identical. If yes, they are carryover rows, we remove them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation of building the changelog iterator is to combine two operation together in one pass. But if there is only one operation, a window function seems fit better.
Ready for another look. cc @RussellSpitzer @szehon-ho @aokolnychyi |
retest this please |
df = transform(df, repartitionColumns); | ||
} else { | ||
LOG.warn("Cannot compute the update-rows because identifier columns are not set"); | ||
if (removeCarryoverRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pull the if (removeCarryoverRow) outside of this if statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically
If Idenifier {
}
if carryover {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The branch has been removed. Let me know if there is anything I missed.
(k, v) -> { | ||
if (k.toString().equals(SparkReadOptions.START_TIMESTAMP) | ||
|| k.toString().equals(SparkReadOptions.END_TIMESTAMP)) { | ||
options.put(k.toString(), toMillis(v.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little lost on our conversion here, don't we already have code to convert this read option from String => millis within the reader itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read option only accepts the string of milliseconds, not the string of timestamp like 2019-02-08 03:29:51.215
. Here is an example of read option.
// time travel to October 26, 1986 at 01:21:00
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("path/to/table")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't do any conversion here. If we want to support timestamps, we should add support to the reader. It is possible by adding new functionality to SparkReadConf
. For now, I'd avoid any transformations to options in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the conversion. To support both format, will file separated PR in the reader side.
Getting to this PR soon. |
* <li>(id=1, data='b', op='UPDATE_AFTER') | ||
* </ul> | ||
*/ | ||
public class GenerateChangesProcedure extends BaseProcedure { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about the name. I don't have a great alternative but it does not seem to me like we generate changes in this procedure. It seems more like we register a changelog view. Any alternatives?
private static final ProcedureParameter[] PARAMETERS = | ||
new ProcedureParameter[] { | ||
ProcedureParameter.required("table", DataTypes.StringType), | ||
ProcedureParameter.optional("table_change_view", DataTypes.StringType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about changelog_view
, changelog_view_name
or similar?
private static final ProcedureParameter[] PARAMETERS = | ||
new ProcedureParameter[] { | ||
ProcedureParameter.required("table", DataTypes.StringType), | ||
ProcedureParameter.optional("table_change_view", DataTypes.StringType), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defaulting the name will only work in the session catalog but I am not sure we have to do anything about it. Other catalogs will not support views.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add in a "precondition" "catalog blah is does not support views"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aokolnychyi, could you elaborate a bit? We pass the view name in the output row. I assume it is the same no matter user gives the view name or procedure gives a default name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it by checking this comment #6012 (comment). Let me put a precondition check on the catalog type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking a bit more. Not only SparkSessionCatalog
works, SparkCatalog
works as well. My test TestGenerateChangesProcedure
inherits from SparkCatalogTestBase
, which covers 3 types of catalog. They all work well. Am I missing something here?
private static final StructType OUTPUT_TYPE = | ||
new StructType( | ||
new StructField[] { | ||
new StructField("view_name", DataTypes.StringType, false, Metadata.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name should probably match the input arg name.
return identifierColumns; | ||
} | ||
|
||
private Dataset<Row> changelogRecords(String tableName, InternalRow args) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To me, it would be better to pull the needed arguments before calling this method rather than passing the row here and doing the extraction within the method itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. Fixed in the new commit.
(k, v) -> { | ||
if (k.toString().equals(SparkReadOptions.START_TIMESTAMP) | ||
|| k.toString().equals(SparkReadOptions.END_TIMESTAMP)) { | ||
options.put(k.toString(), toMillis(v.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't do any conversion here. If we want to support timestamps, we should add support to the reader. It is possible by adding new functionality to SparkReadConf
. For now, I'd avoid any transformations to options in this PR.
|
||
private String[] identifierColumns(InternalRow args, String tableName) { | ||
String[] identifierColumns = new String[0]; | ||
if (!args.isNullAt(5) && !args.getString(5).isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the provided identifier columns don't match with identifier columns defined on one of the scan snapshots?
if (identifierColumns.length == 0) { | ||
Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name()); | ||
Table table = loadSparkTable(tableIdent).table(); | ||
identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if some older snapshots have another set of identifier fields? We don't have to support it but I wonder whether we can validate in the reader that all snapshots that are being scanned have the expected identifier columns or those are undefined. Cause if we use a set of identifier columns and it is different from the real ones, it will become a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combined with comment #6012 (comment), we could push it down to reader to validate the identifier columns of each snapshot.
Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns); | ||
df = transform(df, repartitionColumns); | ||
} else { | ||
LOG.warn("Cannot compute the update-rows because identifier columns are not set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should proceed with the execution if the user asked to compute pre/post images but that's not possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
.mapPartitions( | ||
(MapPartitionsFunction<Row, Row>) | ||
rowIterator -> | ||
ChangelogIterator.iterator(rowIterator, changeTypeIdx, repartitionIdx), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few questions about ChangelogIterator
, which I left on #6344.
sql("CALL %s.system.generate_changes(table => '%s')", catalogName, tableName); | ||
|
||
String viewName = (String) returns.get(0)[0]; | ||
assertEquals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this output be different without the identifier columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will. I also made the change that computing update is off by default. So that user has to explicitly set it to true to honor the the identifier columns, otherwise, they are not used.
* data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change | ||
* to the table. The iterator finds the carry-over rows and removes them from the result. | ||
* | ||
* <p>An update-row is converted from a pair of delete row and insert row. Identifier columns are |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pair of a delete row and an insert row
* to the table. The iterator finds the carry-over rows and removes them from the result. | ||
* | ||
* <p>An update-row is converted from a pair of delete row and insert row. Identifier columns are | ||
* needed for identifying whether they refer to the same row. You can either set Identifier Field |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed => used?
Identifier columns are used for determining whether an insert and delete record refer to the same row. If the two records share the same values for the identity columns they are considered to be before and after states of the same row.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change accordingly.
df = removeCarryoverRows(df); | ||
} | ||
|
||
String viewName = viewName(args, tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could be checking earlier whether or not the catalog specified by this view name is allowed to create views
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all of my questions are in the comments now, I don't see any major blockers on this for me.
Resolve the comments
807294e
to
41bb85b
Compare
Resolved comments. Ready for another look. cc @aokolnychyi @RussellSpitzer |
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangeViewProcedure.java
Outdated
Show resolved
Hide resolved
Thanks a lot for the detailed review, @aokolnychyi ! Resolved them all and ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final set of comments and should be good to go.
@@ -173,6 +174,10 @@ stringMap | |||
: MAP '(' constant (',' constant)* ')' | |||
; | |||
|
|||
stringArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
|
||
String viewName = (String) returns.get(0)[0]; | ||
|
||
// the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate? I thought we were supposed to keep carryovers in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still accurate. The default behavior is that NOT computing updates, but removing carryovers. I can change the command to this, so that it is more clear we test the default behavior here.
"CALL %s.system.create_changelog_view(table => '%s')",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this test calls remove_carryovers = false
and the carryovers are not removed as far as I see in the check below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry, I'm looking at a different test. I have removed the comment in the new commit. Thanks for catching it.
|
||
@Test | ||
public void testNotRemoveCarryOvers() { | ||
removeTables(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we explicitly call this if we have After
method to clean up tables? Is it because we always create a default table first? If so, can we remove the Before
init method and just call a correct create method in each test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Made the change in the next commit.
} | ||
|
||
@After | ||
public void removeTables() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We usually have these init methods at the top.
import scala.runtime.BoxedUnit; | ||
|
||
/** | ||
* A procedure that creates a view for changed rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks accurate now, thanks for updating!
} | ||
|
||
@NotNull | ||
private static Column[] getRepartitionExpr(Dataset<Row> df, String[] identifiers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we decide to add computeUpdateImages
, I would put this logic there directly, like you did for carryovers.
: args.getString(CHANGELOG_VIEW_NAME_ORDINAL); | ||
if (viewName == null) { | ||
String shortTableName = | ||
tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already pass ident.name()
to this method. Instead of checking for dots in the name, I think we can use the approach from the snippet above and escape the name using backticks.
args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL) | ||
? null | ||
: args.getString(CHANGELOG_VIEW_NAME_ORDINAL); | ||
if (viewName == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about having if/else instead of an extra var and ternary operator above?
if (args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)) {
return String.format("`%s_changes`", tableName);
} else {
return args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
}
.toArray(String[]::new); | ||
} | ||
|
||
if (identifierColumns.length == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be if/else. If someone provides empty identifier columns, we should complain.
if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
return ...;
} else {
return ...;
}
.mapPartitions( | ||
(MapPartitionsFunction<Row, Row>) | ||
rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields), | ||
RowEncoder.apply(df.schema())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use schema
var defined above?
I also added this to our 1.2 milestone. I think we should be able to merge it tomorrow. |
Thanks a lot for the review @aokolnychyi. Resolved all of them and ready for another look. |
retest this please |
The pipeline error is not related.
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a few minor nits but nothing that would stop from getting this PR in. It is already in a pretty good shape. I'll merge it now. We can address the last feedback in a follow-up PR later.
@@ -144,6 +148,12 @@ protected SparkTable loadSparkTable(Identifier ident) { | |||
} | |||
} | |||
|
|||
protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, Map<String, String> options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a shorter yet descriptive name? Like loadRows
, loadContent
, etc?
@@ -144,6 +148,12 @@ protected SparkTable loadSparkTable(Identifier ident) { | |||
} | |||
} | |||
|
|||
protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, Map<String, String> options) { | |||
String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent); | |||
// no need to validate the read options here since the reader will validate them |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this comment anymore since it is a pretty generic method now.
private Dataset<Row> computeUpdateImages(String[] identifierColumns, Dataset<Row> df) { | ||
Preconditions.checkArgument( | ||
identifierColumns.length > 0, | ||
"Cannot compute the update-rows because identifier columns are not set"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update-rows
-> update images
?
identifierColumns.length > 0, | ||
"Cannot compute the update-rows because identifier columns are not set"); | ||
|
||
Column[] repartitionColumns = new Column[identifierColumns.length + 1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We sometimes call it repartitionColumns
and sometimes repartitionSpec
.
I'd probably use repartitionSpec
everywhere since it is shorter (this statement would fit on 1 line?) and matches sortSpec
used in other methods.
Thanks, @flyrain! I am excited to test this out in real use cases. |
Thanks a lot @aokolnychyi! I will address these comments in a followup PR. It is a milestone. I'm also excited to see how people use it. Thanks everybody for the review, @RussellSpitzer @chenjunjiedada @hililiwei @rdblue! |
This change backports PR #6012 to Spark 3.2.
This change backports PR apache#6012 to Spark 3.2.
(cherry picked from commit 9cf9ca2)
Add a procedure to generate table changes. Here are changes in this PR.
cc @aokolnychyi @rdblue @szehon-ho @kbendick @anuragmantri @karuppayya @chenjunjiedada @RussellSpitzer