Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3: Add a procedure to generate table changes #6012

Merged
merged 30 commits into from
Mar 4, 2023

Conversation

flyrain
Copy link
Contributor

@flyrain flyrain commented Oct 18, 2022

Add a procedure to generate table changes. Here are changes in this PR.

  1. Defines the user interface.
  2. Generates update pre-image and post-image when user provide the identifier columns.
  3. Uses the window function instead of joining for better performance.

cc @aokolnychyi @rdblue @szehon-ho @kbendick @anuragmantri @karuppayya @chenjunjiedada @RussellSpitzer

functions.lit(ChangelogOperation.UPDATE_POSTIMAGE.name()));

// remove the carry-over rows
Dataset<Row> dfWithoutCarryOver = removeCarryOvers(preImageDf.union(postImageDf));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it optional since it is a heavy operation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another algorithm we can consider that would make it cheaper? Will something like this work?

- Load DELETEs and INSERTs as a DF
- Repartition the DF by primary key, _change_ordinal and locally sort by primary key, _change_ordinal, _operation_type
- Call mapPartitions with a closure that would look at the previous, current and next rows
  - If the previous, current, next row keys are different, output the current row as-is
  - If the next row key is same, the current row must be DELETE and the next row must be INSERT (if not -> exception)
      - If other columns beyond the key are same, it is a copied over row
          - Output null if unchanged rows should be ignored
          - Output the current row as-is if all rows should be produced 
      - If other columns beyond key are different, it is an update
          - Output the current row as pre-update
  - If the previous row key is same as the current one, the current row must be INSERT and the previous row must be DELETE
      - If other columns beyond the key are same, it is a copied over row
          - Output null if unchanged rows should be ignored
          - Output the current row as-is if all rows should be produced 
      - If other columns beyond key are different, it is an update
          - Output the current row as post-update

That would require reading the changes only once, doing a single hash-based shuffle to co-locate rows for the same key and change ordinal, keeping at most 3 rows in memory at a time. Seems fairly cheap?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand why we would need the previous row and the next row. If we are iterating over rows, then the current will become the previous, so we should only look forward or backward right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aokolnychyi for the suggestion. Make sense to shuffle once. Agreed with @rdblue, to just look forward should be good, no need to search bidirectionally. Will make the change accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change, could you take a look?

Copy link
Contributor

@aokolnychyi aokolnychyi Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with can pre-compute the required state by just looking ahead.

@flyrain
Copy link
Contributor Author

flyrain commented Oct 19, 2022

The test failure is not related.

@flyrain flyrain changed the title Add a procedure to generate table changes Spark 3.3: Add a procedure to generate table changes Oct 19, 2022
@aokolnychyi
Copy link
Contributor

Let me take a look today.

functions.lit(ChangelogOperation.UPDATE_POSTIMAGE.name()));

// remove the carry-over rows
Dataset<Row> dfWithoutCarryOver = removeCarryOvers(preImageDf.union(postImageDf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another algorithm we can consider that would make it cheaper? Will something like this work?

- Load DELETEs and INSERTs as a DF
- Repartition the DF by primary key, _change_ordinal and locally sort by primary key, _change_ordinal, _operation_type
- Call mapPartitions with a closure that would look at the previous, current and next rows
  - If the previous, current, next row keys are different, output the current row as-is
  - If the next row key is same, the current row must be DELETE and the next row must be INSERT (if not -> exception)
      - If other columns beyond the key are same, it is a copied over row
          - Output null if unchanged rows should be ignored
          - Output the current row as-is if all rows should be produced 
      - If other columns beyond key are different, it is an update
          - Output the current row as pre-update
  - If the previous row key is same as the current one, the current row must be INSERT and the previous row must be DELETE
      - If other columns beyond the key are same, it is a copied over row
          - Output null if unchanged rows should be ignored
          - Output the current row as-is if all rows should be produced 
      - If other columns beyond key are different, it is an update
          - Output the current row as post-update

That would require reading the changes only once, doing a single hash-based shuffle to co-locate rows for the same key and change ordinal, keeping at most 3 rows in memory at a time. Seems fairly cheap?

ProcedureParameter.optional("table_change_view", DataTypes.StringType),
ProcedureParameter.optional("identifier_columns", DataTypes.StringType),
ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType),
ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about the number of parameters to configure boundaries. What if we replaced all of them with generic options and would pass those options along when loading DataFrame? Then instead of determining what snapshots match our timestamp range in the procedure, we would do that when scanning the changelog table. That way, users would be able to use timestamp boundaries not only via procedure but also via DataFrame. Right now, we only support snapshot ID boundaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to allow Dataframe to consume timestamp range. Will create a followup PR for that. For this procedure, we still need all these parameter, right? What do you mean by replacing all of them with generic options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. I'd consider having read_options or options as a map that would be passed while loading deletes and inserts as DataFrame. Then users can specify boundaries directly in the map.

We already respect these options from SparkReadOptions in the changes table:

// Start snapshot ID used in incremental scans (exclusive)
public static final String START_SNAPSHOT_ID = "start-snapshot-id";

// End snapshot ID used in incremental scans (inclusive)
public static final String END_SNAPSHOT_ID = "end-snapshot-id";

We could add start-timestamp and end-timestamp, start-snapshot-id-inclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to options in the procedure. Will add the timestamp range in another PR.

ProcedureParameter.optional("table_change_view", DataTypes.StringType),
ProcedureParameter.optional("identifier_columns", DataTypes.StringType),
ProcedureParameter.optional("start_timestamp", DataTypes.TimestampType),
ProcedureParameter.optional("end_timestamp", DataTypes.TimestampType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. I'd consider having read_options or options as a map that would be passed while loading deletes and inserts as DataFrame. Then users can specify boundaries directly in the map.

We already respect these options from SparkReadOptions in the changes table:

// Start snapshot ID used in incremental scans (exclusive)
public static final String START_SNAPSHOT_ID = "start-snapshot-id";

// End snapshot ID used in incremental scans (inclusive)
public static final String END_SNAPSHOT_ID = "end-snapshot-id";

We could add start-timestamp and end-timestamp, start-snapshot-id-inclusive.

@@ -53,6 +53,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
mapBuilder.put("register_table", RegisterTableProcedure::builder);
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
mapBuilder.put("generate_changes", GenerateChangesProcedure::builder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any alternative names? I am not sure the procedure actually generates changes.
Let's think a bit. It is not bad but I wonder whether we can be a bit more specific.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about create_change_view generate_change_view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other options like register_change_view create_changelog_view

.filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
.map(df::col)
.toArray(Column[]::new);
return transform(df, repartitionColumns);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reused the same changelog iterator for removing carry-over rows only. I think we can optimize it here, for example, using the windows function. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure I understand the comment here, don't we have the iterator so we don't need to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we only remove carryover rows, without computing the updated rows. The iterator is built for both, it will check if they are updated rows and check if they are carryover rows. The first check is not necessary. We only need the second one to see if two rows are identical. If yes, they are carryover rows, we remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation of building the changelog iterator is to combine two operation together in one pass. But if there is only one operation, a window function seems fit better.

@flyrain
Copy link
Contributor Author

flyrain commented Jan 12, 2023

Ready for another look. cc @RussellSpitzer @szehon-ho @aokolnychyi

@flyrain
Copy link
Contributor Author

flyrain commented Jan 12, 2023

retest this please

df = transform(df, repartitionColumns);
} else {
LOG.warn("Cannot compute the update-rows because identifier columns are not set");
if (removeCarryoverRow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pull the if (removeCarryoverRow) outside of this if statement?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically
If Idenifier {

}

if carryover {

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The branch has been removed. Let me know if there is anything I missed.

(k, v) -> {
if (k.toString().equals(SparkReadOptions.START_TIMESTAMP)
|| k.toString().equals(SparkReadOptions.END_TIMESTAMP)) {
options.put(k.toString(), toMillis(v.toString()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little lost on our conversion here, don't we already have code to convert this read option from String => millis within the reader itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read option only accepts the string of milliseconds, not the string of timestamp like 2019-02-08 03:29:51.215. Here is an example of read option.

// time travel to October 26, 1986 at 01:21:00
spark.read
    .option("as-of-timestamp", "499162860000")
    .format("iceberg")
    .load("path/to/table")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't do any conversion here. If we want to support timestamps, we should add support to the reader. It is possible by adding new functionality to SparkReadConf. For now, I'd avoid any transformations to options in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the conversion. To support both format, will file separated PR in the reader side.

@aokolnychyi
Copy link
Contributor

Getting to this PR soon.

* <li>(id=1, data='b', op='UPDATE_AFTER')
* </ul>
*/
public class GenerateChangesProcedure extends BaseProcedure {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the name. I don't have a great alternative but it does not seem to me like we generate changes in this procedure. It seems more like we register a changelog view. Any alternatives?

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("table_change_view", DataTypes.StringType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about changelog_view, changelog_view_name or similar?

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("table_change_view", DataTypes.StringType),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting the name will only work in the session catalog but I am not sure we have to do anything about it. Other catalogs will not support views.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add in a "precondition" "catalog blah is does not support views"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @aokolnychyi, could you elaborate a bit? We pass the view name in the output row. I assume it is the same no matter user gives the view name or procedure gives a default name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it by checking this comment #6012 (comment). Let me put a precondition check on the catalog type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking a bit more. Not only SparkSessionCatalog works, SparkCatalog works as well. My test TestGenerateChangesProcedure inherits from SparkCatalogTestBase, which covers 3 types of catalog. They all work well. Am I missing something here?

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("view_name", DataTypes.StringType, false, Metadata.empty())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name should probably match the input arg name.

return identifierColumns;
}

private Dataset<Row> changelogRecords(String tableName, InternalRow args) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: To me, it would be better to pull the needed arguments before calling this method rather than passing the row here and doing the extraction within the method itself.

Copy link
Contributor Author

@flyrain flyrain Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. Fixed in the new commit.

(k, v) -> {
if (k.toString().equals(SparkReadOptions.START_TIMESTAMP)
|| k.toString().equals(SparkReadOptions.END_TIMESTAMP)) {
options.put(k.toString(), toMillis(v.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't do any conversion here. If we want to support timestamps, we should add support to the reader. It is possible by adding new functionality to SparkReadConf. For now, I'd avoid any transformations to options in this PR.


private String[] identifierColumns(InternalRow args, String tableName) {
String[] identifierColumns = new String[0];
if (!args.isNullAt(5) && !args.getString(5).isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the provided identifier columns don't match with identifier columns defined on one of the scan snapshots?

if (identifierColumns.length == 0) {
Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name());
Table table = loadSparkTable(tableIdent).table();
identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if some older snapshots have another set of identifier fields? We don't have to support it but I wonder whether we can validate in the reader that all snapshots that are being scanned have the expected identifier columns or those are undefined. Cause if we use a set of identifier columns and it is different from the real ones, it will become a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined with comment #6012 (comment), we could push it down to reader to validate the identifier columns of each snapshot.

Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
df = transform(df, repartitionColumns);
} else {
LOG.warn("Cannot compute the update-rows because identifier columns are not set");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should proceed with the execution if the user asked to compute pre/post images but that's not possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

.mapPartitions(
(MapPartitionsFunction<Row, Row>)
rowIterator ->
ChangelogIterator.iterator(rowIterator, changeTypeIdx, repartitionIdx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few questions about ChangelogIterator, which I left on #6344.

sql("CALL %s.system.generate_changes(table => '%s')", catalogName, tableName);

String viewName = (String) returns.get(0)[0];
assertEquals(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this output be different without the identifier columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will. I also made the change that computing update is off by default. So that user has to explicitly set it to true to honor the the identifier columns, otherwise, they are not used.

* data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change
* to the table. The iterator finds the carry-over rows and removes them from the result.
*
* <p>An update-row is converted from a pair of delete row and insert row. Identifier columns are
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pair of a delete row and an insert row

* to the table. The iterator finds the carry-over rows and removes them from the result.
*
* <p>An update-row is converted from a pair of delete row and insert row. Identifier columns are
* needed for identifying whether they refer to the same row. You can either set Identifier Field
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed => used?

Identifier columns are used for determining whether an insert and delete record refer to the same row. If the two records share the same values for the identity columns they are considered to be before and after states of the same row.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change accordingly.

df = removeCarryoverRows(df);
}

String viewName = viewName(args, tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could be checking earlier whether or not the catalog specified by this view name is allowed to create views

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of my questions are in the comments now, I don't see any major blockers on this for me.

@flyrain
Copy link
Contributor Author

flyrain commented Feb 22, 2023

Resolved comments. Ready for another look. cc @aokolnychyi @RussellSpitzer

@flyrain
Copy link
Contributor Author

flyrain commented Feb 24, 2023

Thanks a lot for the detailed review, @aokolnychyi ! Resolved them all and ready for another look.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final set of comments and should be good to go.

@@ -173,6 +174,10 @@ stringMap
: MAP '(' constant (',' constant)* ')'
;

stringArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.


String viewName = (String) returns.get(0)[0];

// the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate? I thought we were supposed to keep carryovers in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still accurate. The default behavior is that NOT computing updates, but removing carryovers. I can change the command to this, so that it is more clear we test the default behavior here.

"CALL %s.system.create_changelog_view(table => '%s')",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this test calls remove_carryovers = false and the carryovers are not removed as far as I see in the check below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I'm looking at a different test. I have removed the comment in the new commit. Thanks for catching it.


@Test
public void testNotRemoveCarryOvers() {
removeTables();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we explicitly call this if we have After method to clean up tables? Is it because we always create a default table first? If so, can we remove the Before init method and just call a correct create method in each test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Made the change in the next commit.

}

@After
public void removeTables() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually have these init methods at the top.

import scala.runtime.BoxedUnit;

/**
* A procedure that creates a view for changed rows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks accurate now, thanks for updating!

}

@NotNull
private static Column[] getRepartitionExpr(Dataset<Row> df, String[] identifiers) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we decide to add computeUpdateImages, I would put this logic there directly, like you did for carryovers.

: args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
if (viewName == null) {
String shortTableName =
tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already pass ident.name() to this method. Instead of checking for dots in the name, I think we can use the approach from the snippet above and escape the name using backticks.

args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
? null
: args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
if (viewName == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about having if/else instead of an extra var and ternary operator above?

if (args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)) {
  return String.format("`%s_changes`", tableName);
} else {
  return args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
}

.toArray(String[]::new);
}

if (identifierColumns.length == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be if/else. If someone provides empty identifier columns, we should complain.

if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
  return ...;
} else {
  return ...;
}

.mapPartitions(
(MapPartitionsFunction<Row, Row>)
rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields),
RowEncoder.apply(df.schema()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use schema var defined above?

@aokolnychyi aokolnychyi added this to the Iceberg 1.2.0 milestone Mar 2, 2023
@aokolnychyi
Copy link
Contributor

I also added this to our 1.2 milestone. I think we should be able to merge it tomorrow.
Thanks for making this happen, @flyrain!

@flyrain
Copy link
Contributor Author

flyrain commented Mar 3, 2023

Thanks a lot for the review @aokolnychyi. Resolved all of them and ready for another look.

@flyrain
Copy link
Contributor Author

flyrain commented Mar 3, 2023

retest this please

@flyrain
Copy link
Contributor Author

flyrain commented Mar 3, 2023

The pipeline error is not related.

> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
   > An unexpected error occurred configuring and executing Checkstyle.
      > java.lang.Error: Error was thrown while processing /home/runner/work/iceberg/iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java

@flyrain
Copy link
Contributor Author

flyrain commented Mar 3, 2023

retest this please

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few minor nits but nothing that would stop from getting this PR in. It is already in a pretty good shape. I'll merge it now. We can address the last feedback in a follow-up PR later.

@@ -144,6 +148,12 @@ protected SparkTable loadSparkTable(Identifier ident) {
}
}

protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, Map<String, String> options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a shorter yet descriptive name? Like loadRows, loadContent, etc?

@@ -144,6 +148,12 @@ protected SparkTable loadSparkTable(Identifier ident) {
}
}

protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, Map<String, String> options) {
String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
// no need to validate the read options here since the reader will validate them
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this comment anymore since it is a pretty generic method now.

private Dataset<Row> computeUpdateImages(String[] identifierColumns, Dataset<Row> df) {
Preconditions.checkArgument(
identifierColumns.length > 0,
"Cannot compute the update-rows because identifier columns are not set");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update-rows -> update images?

identifierColumns.length > 0,
"Cannot compute the update-rows because identifier columns are not set");

Column[] repartitionColumns = new Column[identifierColumns.length + 1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We sometimes call it repartitionColumns and sometimes repartitionSpec.
I'd probably use repartitionSpec everywhere since it is shorter (this statement would fit on 1 line?) and matches sortSpec used in other methods.

@aokolnychyi aokolnychyi merged commit 9cf9ca2 into apache:master Mar 4, 2023
@aokolnychyi
Copy link
Contributor

Thanks, @flyrain! I am excited to test this out in real use cases.

@flyrain
Copy link
Contributor Author

flyrain commented Mar 4, 2023

Thanks a lot @aokolnychyi! I will address these comments in a followup PR. It is a milestone. I'm also excited to see how people use it. Thanks everybody for the review, @RussellSpitzer @chenjunjiedada @hililiwei @rdblue!

slfan1989 pushed a commit to slfan1989/iceberg that referenced this pull request Mar 4, 2023
flyrain added a commit to flyrain/iceberg that referenced this pull request Mar 8, 2023
aokolnychyi pushed a commit that referenced this pull request Mar 8, 2023
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants