Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3: Introduce the changelog iterator #6344

Merged
merged 15 commits into from
Jan 5, 2023
Merged

Conversation

flyrain
Copy link
Contributor

@flyrain flyrain commented Dec 2, 2022

Introduce an iterator to handle changelog rows within a task. It will be used by the changelog procedure #6012.
cc @aokolnychyi @RussellSpitzer @szehon-ho

@flyrain flyrain requested a review from aokolnychyi December 2, 2022 00:27
@flyrain flyrain changed the title [Spark3.3] Introduce the changelog iterator Spark3.3: Introduce the changelog iterator Dec 2, 2022
@flyrain
Copy link
Contributor Author

flyrain commented Dec 2, 2022

The error is unrelated

> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
See https://docs.gradle.org/7.6/userguide/command_line_interface.html#sec:command_line_warnings
   > An unexpected error occurred configuring and executing Checkstyle.
472 actionable tasks: 472 executed
      > java.lang.Error: Error was thrown while processing /home/runner/work/iceberg/iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java

@flyrain flyrain changed the title Spark3.3: Introduce the changelog iterator Spark 3.3: Introduce the changelog iterator Dec 2, 2022
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand the core algorithm, but had some basic questions about some of these flags.


// set the change_type to the same value
deletedRow.update(changeTypeIndex, "");
insertedRow.update(changeTypeIndex, "");
Copy link
Collaborator

@szehon-ho szehon-ho Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit hard to read (modify and recover the values). Could we instead do comparison of deletedRow and insertedRow without the changeTypeIndex? (ie, make the vars immutable/stateless)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the recover part. Is it cleaner now?

insertedRow.update(changeTypeIndex, "");

if (deletedRow.equals(insertedRow)) {
// remove two carry-over rows
Copy link
Collaborator

@szehon-ho szehon-ho Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is not so clear to me. Im still a bit confused the concept of carry-over, can't find where its defined but I see it some places in your other pr. What about making the comment: "clear cached state"? Unless carry-over has some significance.

Copy link
Contributor Author

@flyrain flyrain Dec 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added Java doc in the class like this to explain what are carry-over rows.

 * Carry-over rows are unchanged
 * rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the
 * copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2,
 * data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and
 * delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and
 * one insert-row(row1). Row1 is a carry-over row.

}

private boolean withinPartition(Row currentRow, Row nextRow) {
for (int i = 0; i < partitionIdx.size(); i++) {
Copy link
Collaborator

@szehon-ho szehon-ho Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm missing some context here. The rows have partition values directly? (No need to transform?). And the partition field ids are always the same for all rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The partition columns here are identifier columns, which will be provided by the CDC procedure in #6012.

this.cachedRow = RowFactory.create(insertedRow.values());
} else {
// recover the values of change type
deletedRow.update(changeTypeIndex, DELETE);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is markUpdated just the underlying iterator? (Wonder if we can short circuit). Or what is the difference. Not sure I understand the flag.

Copy link
Contributor Author

@flyrain flyrain Dec 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the flag markUpdated, more details are in #6344 (comment).

@Override
public Row next() {
// if there is a processed cached row, return it directly
if (cachedRow != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may help readability to change this to something like

if (nonUpdate(cachedRow)) {
   Row row = cachedRow;
   cachedRow = null;
   return row
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is it "if (updateRow"? the negations are confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change, thanks for the suggestion.

}
}

private boolean withinPartition(Row currentRow, Row nextRow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this "sameRow" basically? Since we are basically checking are these two rows are and pre and post update correct?

Copy link
Contributor Author

@flyrain flyrain Dec 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This iterator will only be used in CDC procedure, so it is safe to assume that the partition columns here are identifier columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two rows can be a update row or a carry-over row.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with the name 'partition'? Wont it be overloaded in Iceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is overloaded. Here the partition refer to the one generated by DataFrame::repartition(). I'm not sure what would be a better name. Open for suggestions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea tbh it threw me off a bit initially. How about just 'equivalent'? And partitionFieldIdx => identifierFieldIdx? Not sure what you guys think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think something like "sameLogicalRow(Row currentRow, Row nextRow)"

if (rowIterator.hasNext()) {
GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next();

if (withinPartition(currentRow, nextRow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would bundle this whole clause into the same helper

If (updateRecord(currentRow, nextRow)) {

}

@flyrain
Copy link
Contributor Author

flyrain commented Dec 10, 2022

Thanks @szehon-ho @RussellSpitzer for the review. Resolve all comments. Particularly, I have removed the flag markUpdated. I tried to use the flag to cover more use cases like following:

  1. Users want to remove carry-over rows without generating the update rows.
  2. Users want to remove carry-over rows and generate the update rows.

I think it may make more sense to have different iterators. The PR is only for the case 2. The PR is ready for another look.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for changes, had some more style/naming suggestion for code clarity for newcomers like me.


private boolean isCarryoverRecord(GenericInternalRow deletedRow, GenericInternalRow insertedRow) {
// set the change_type to the same value
deletedRow.update(changeTypeIndex, "");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: I still don't prefer to do a side-effect like this (mutate the variable in the method) just for comparison. Caller may be taken by surprise.

One way is probably to construct deletedRow and insertedRow with the "" values to begin with. But I guess Java will be a bit verbose:

    new GenericInternalRow(IntStream.range(0, row.values().length)
        .mapToObj(i -> (i == changeTypeIndex) ? "" : row.values()[i])
        .toArray())

Another option is to move the update("") back to the base method?

GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values());
deletedRow.update(changeTypeIndex, "");

At least in this case when you read the update() method, it is more clear what the state is without having to scroll to the helper methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just define an equals method that ignores ChangeTypeIndex

return currentRow;
}

private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like createUpdateChangelog() to be more descriptive? "update" sounds like it is updating something.

*
* <p>It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged
* rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the
* copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for comment. Just wanted to check, why is it not possible for MOR? Data file + position delete of same row?

Copy link
Contributor Author

@flyrain flyrain Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be precise to use COW here. Basically any metadata delete of a data file may have the carry-over rows. I will make the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking a bit more, MOR shouldn't have this carry-over rows. MOR will always delete a row which supposed to be deleted due to a delete or update operation. MOR won't delete other unrelated rows like COW does.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm.. so if I do appendFile(dataFile) with a certain row, and then I do rowDelta() and add a position delete for that data file? In the same snapshot (I think thats allowed?) Is that considered a carry-over row? Or is that taken care of some other way?

Copy link
Contributor Author

@flyrain flyrain Dec 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider to remove them since nothing really changes, but that's not the scope of this iterator. We haven't started CDC for MOR yet. We probably don't call them carry-over rows since they exist within one snapshot.

}
}

private boolean withinPartition(Row currentRow, Row nextRow) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with the name 'partition'? Wont it be overloaded in Iceberg?

@Override
public Row next() {
// if there is an updated cached row, return it directly
if (updated(cachedRow)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about 'cachedUpdateChangelog' ?

return deletedRow.equals(insertedRow);
}

private boolean updated(Row row) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also one more, variable may make more sense as cachedRow or cachedChangelog. Have to always remember that this method is only for checking the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I was thinking whether or not it should be like cachedUpdateRecord or something like that

So we have "rows" and "records", rows are the input and Records are the output, even though they are all actually "Row" type

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.


if (isCarryoverRecord(deletedRow, insertedRow)) {
// set carry-over rows to null for filtering out later
return new Row[] {null, null};
Copy link
Collaborator

@szehon-ho szehon-ho Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall question, is the null filter in Spark later, just for this case? If so have you considered taking care of this in the iterator, something like (very pseudo):

do {
rows = nextInternal();
} while (row[0] == null and row[1] == null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will filter out the null row by concatenating this iterator with another one like what we did in the test.

Iterators.filter(iterator, Objects::nonNull)

Here is another code example, https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java#L476-L476

I'm fine with either one, but the iterator concatenation would be a nice solution to reduce the complexity of this iterator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Maybe we can have this iterator just wrap Iterators.filter(...)? If this is kind of a standalone class, not sure if there is anyone is interested in having nulls? wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to concatenate them in the CDC procedure. I don't think anyone would be interested in the nulls. Let me think about how to merge them within a class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very expensive way to pass back 2 nulls

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just return "null" and do an "if null { currentRow = null , cachedRow = null }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying to make it more readable by return 2 nulls. Will make the change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A static method is used for concatenating two iterators.

insertedRow.update(changeTypeIndex, UPDATE_AFTER);

return new Row[] {
RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we making new rows here? Can we not just use our GenericInternalRows here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericInternalRow is not a subclass of Row, we need to return a Row instance here.

private final List<Integer> partitionIdx = Lists.newArrayList(0, 1);

@Test
public void testUpdatedRows() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need more exhaustive tests here. I would probably permute our 4 types of records here.

Update - CarryOver - Insert - Delete

We should make sure this works in any permutation of ordering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change.

@flyrain
Copy link
Contributor Author

flyrain commented Dec 16, 2022

Thanks @szehon-ho and @RussellSpitzer for the review. Resolved all comments. Ready for the another look.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks cleaner to me now. Added some style/doc comments and maybe wait for Russell to take a look

* <p>It removes the carry-over rows. Carry-over rows are unchanged rows in a snapshot but showed as
* delete-rows and insert-rows in a changelog table due to the copy-on-write(COW) mechanism. For
* example, there are row1 (id=1, data='a') and row2 (id=2, data='b') in a data file, if we only
* delete row2, the COW will copy row1 to a new data file and delete the whole old data file. The
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIt: 'whole' old file sounds strange, I would just omit it.

}

public static Iterator<Row> iterator(
Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> partitionIdx) {
Copy link
Collaborator

@szehon-ho szehon-ho Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we still change name from partitionIdx to identifierFieldIdx? Be better not to mention partition at all to avoid confusion with real partition fields.

Also a javadoc here on arguments will be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Thanks for the suggesiton.

@@ -112,11 +112,11 @@ protected List<Object[]> sql(String query, Object... args) {
return rowsToJava(rows);
}

protected List<Object[]> rowsToJava(List<Row> rows) {
return rows.stream().map(this::toJava).collect(Collectors.toList());
public static List<Object[]> rowsToJava(List<Row> rows) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: I feel its messy now to have this class now as inherited and a util class. Especially some methods are changed to static and other methods are not, even though they call the static ones.

What do you think? Maybe we can make a separate base class for the helper methods like 'SparkTestHelperBase' and have both SparkTestBase and your test inherit from it (to avoid changing all the tests)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change per suggestion.

@flyrain
Copy link
Contributor Author

flyrain commented Dec 20, 2022

Thanks @szehon-ho's suggestion. Made the changes.

* An iterator that transforms rows from changelog tables within a single Spark task. It assumes
* that rows are sorted by identifier columns and change type.
*
* <p>It removes the carry-over rows. Carry-over rows are unchanged rows in a snapshot but showed as
Copy link
Member

@RussellSpitzer RussellSpitzer Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was working a little on re-wording this a bit? Feel free to use any of this if you like

Carry-over rows are the result of a removal and addition of the same row within an operation because of the copy-on-write mechanism. The iterator uses the "identifier fields" to determine of two rows are identical. For example, given a file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). in a data file, a copy-on-write delete of row2 would require erasing this file. To preserve row1 a new file is written with row1' which is identical to row1. The change-log table would report this as (row1 deleted, row1' added), since this row was not actually modified it is not an actual change in the table. One function of this iterator is to remove these carry-over rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted it in a new commit. I'm using insert instead of add to be consistent with our change type.

* <p>It removes the carry-over rows. Carry-over rows are the result of a removal and insertion of
* the same row within an operation because of the copy-on-write mechanism. For example, given a
* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of
* row2 would require erasing this file and preserving row1 in a new file written with row1' which
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think its hard to read now, because we use both row1' and row1 (id=1, data='a'). I think either we can just use row, row1' notation, or actual value notation throughout, and be consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good points. Changed to actual value.

* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of
* row2 would require erasing this file and preserving row1 in a new file written with row1' which
* is identical to row1. The change-log table would report this as (row1 deleted) and (row1'
* inserted), since this row was not actually modified it is not an actual change in the table. The
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last part of the sentence does not seem gramatically correct (it's two sentences without any conjunction). Maybe:
since this row was not actually modified it is not an actual change in the table
=>
despite it not being an actual change to the table

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better than what I had

}

/**
* Creates a new {@link ChangelogIterator} instance concatenated with the null-removal iterator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can avoid having so much implementation details, as it may change in future? Also not sure the use of linking to this class itself.

How about something more logical (feel free to elaborate a bit): Creates an iterator for records of changelog table

assertEquals("Rows should match", expectedRows, rowsToJava(result));
}

private List<Row> toOriginalRows(RowType rowType, int order) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: for the variable name, maybe index is better than order, for a value that increases in a collection? Order to me seems a property of whole collection.

@flyrain
Copy link
Contributor Author

flyrain commented Dec 22, 2022

Thanks @szehon-ho. Made the change per suggestions.

*
* @param rowIterator the iterator of rows from a changelog table
* @param changeTypeIndex the index of the change type column
* @param identifierFieldIdx the indices of the identifier columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

columns to be considered for row equality?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change

@flyrain
Copy link
Contributor Author

flyrain commented Jan 3, 2023

Rebased and resolved the comment. Hi @RussellSpitzer @szehon-ho, would you take another look?

* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of
* row2 would require erasing this file and preserving row1 in a new file. The change-log table
* would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it
* not being an actual change to the table. The iterator finds out the carry-over rows and removes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"finds out" -> "finds"

* not being an actual change to the table. The iterator finds out the carry-over rows and removes
* them from the result.
*
* <p>The iterator marks the delete-row and insert-row to be the update-rows. For example, these two
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This iterator also finds delete/insert rows which represent an update and converts them into update records.

?

}

private boolean isCarryoverRecord(Row currentRow, Row nextRow) {
int length = currentRow.length();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be optimized a bit. I think the easiest thing to do off the bat is just save row length for the whole iterator rather than checking for every row.

In the future I guess you could do something like
ColumnsToCompare = [0,1,2,4,5]
But i'm not sure if that's actually faster. I assume eventually JIT would optimize around the

if (i == changeTypeIndex) {
  continue;
}

But skipping the branch is always nice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is faster. Tested with a simulation of 100 columns comparison with two options

  1. Using continue
  2. Using a int[] to store the index

The later always takes only 60-70% time of the former, which is surprisingly faster. I didn't expected so much difference. I made the change anyway in this class.
Here is my test code.

    var left = new ArrayList<Integer>();
    var right = new ArrayList<Integer>();
    for(int i = 0; i < 100; i++) {
      left.add(i);
      right.add(i);
    }

    var start = System.currentTimeMillis();
    var match = false;
    for(int i = 0; i < 1000000; i++) {
      for(int j = 0; j < 100; j++) {
        if( j == 50){
          continue;
        }

        match = left.get(j) == right.get(j);
      }
    }
    var end = System.currentTimeMillis();
    System.out.println(String.format("duration: %s, match %s",  end - start, match));

    var indices = new int[99];
    int k = 0;
    for(int i = 0; i < 100; i++) {
      if(i == 50) {
        continue;
      }
      indices[k++] = i;
    }

    start = System.currentTimeMillis();
    for(int i = 0; i < 1000000; i++) {
      for(int j : indices) {
        match = left.get(j) == right.get(j);
      }
    }
    end = System.currentTimeMillis();
    System.out.println(String.format("duration: %s, match: %s",  end - start, match));

Here are results:

shell:>loop-perf
duration: 119, match true
duration: 92, match: true
shell:>loop-perf
duration: 138, match true
duration: 86, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 126, match true
duration: 85, match: true
shell:>loop-perf
duration: 124, match true
duration: 85, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 126, match true
duration: 85, match: true
shell:>loop-perf
duration: 124, match true
duration: 85, match: true

}

private static boolean isColumnSame(Row currentRow, Row nextRow, int idx) {
if (currentRow.isNullAt(idx) && nextRow.isNullAt(idx)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could simplify this to

Objects.equals(nextRow.get(idx), currentRow.get(idx))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Objects.equals

public void testIterator() {
List<Object[]> permutations = Lists.newArrayList();
// generate 24 permutations
permute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks :)

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few last comments, but this is looking good to me. Thanks for adding the permutations that gives me a lot of confidence that the orderings all work.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, agree with Russell's suggestion on column comparison method

}

private static boolean isColumnSame(Row currentRow, Row nextRow, int idx) {
if (currentRow.isNullAt(idx) && nextRow.isNullAt(idx)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Objects.equals

@flyrain flyrain merged commit 3706825 into apache:master Jan 5, 2023
@flyrain
Copy link
Contributor Author

flyrain commented Jan 5, 2023

Thanks for the reviews, @RussellSpitzer @szehon-ho !

* <li>(id=1, data='b', op='UPDATE_AFTER')
* </ul>
*/
public class ChangelogIterator implements Iterator<Row>, Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this class really have to be Serializable? If I understand correctly, it won't be serialized as it is constructed within a closure called on executors. The closure will be serialized but I don't think the iterator itself will be serialized. If we mark the iterator as serializable, then we have to ensure it is actually serializable. I am not sure it actually is in the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did another test. You are right, Serializable isn't needed. I assume the previous error I hit may due to the different parameter types.

private int[] indicesForIdentifySameRow = null;

private ChangelogIterator(
Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> identifierFieldIdx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I'd probably pass a schema into this method and make it responsible for computing indices rather than relying on input. This would allow us to compute things like indicesForIdentifySameRow right away instead of having an if condition for every row. It is optional but it seems safer and would make this class more independent. The downside is the need to compute the indices on each creation but I don't think it matters cause it is done once per Spark partition.

* the same
* @return a new {@link ChangelogIterator} instance concatenated with the null-removal iterator
*/
public static Iterator<Row> iterator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I am not a big fan of this name cause invocations look repetitive ChangelogIterator.iterator(). I'd consider something similar to what we have in Guava's Iterables, where the name is inspired by a verb. Something like wrap, transform or even create. This is optional, though. Keep this name if you think it is better.


Row currentRow = currentRow();

if (indicesForIdentifySameRow == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a schema into the constructor would allow to pre-compute these values right away.


private Row[] createUpdateChangelog(
GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) {
GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values());
Copy link
Contributor

@aokolnychyi aokolnychyi Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid introducing InternalRows just to call update on it because the internal representation is completely different (e.g. UTF8String vs String, int vs timestamp). We can't mix internal and generic rows.

Also, we shouldn't limit ourselves to GenericRowWithSchema. We can have a more optimal implementation if we encounter it but we can't assume we will always get GenericRowWithSchema.

deletedRow.update(changeTypeIndex, UPDATE_BEFORE);
insertedRow.update(changeTypeIndex, UPDATE_AFTER);

return new Row[] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method can be more efficient. We create 4 new rows and an array. Can we modify generic rows in place?

private Row modify(Row row, int valueIndex, Object value) {
  if (row instanceof GenericRow) {
    GenericRow genericRow = (GenericRow) row;
    genericRow.values()[valueIndex] = value;
    return genericRow;

  } else {
    Object[] values = new Object[row.size()];
    for (int index = 0; index < row.size(); index++) {
      values[index] = row.get(index);
    }
    values[valueIndex] = value;
    return RowFactory.create(values);
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way we will also handle all Row implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot @aokolnychyi for the suggestion! Looks much better now!

@aokolnychyi
Copy link
Contributor

@flyrain, sorry it took me so long to get to this PR. Great work!

I noticed a few things we should follow up on. Could you check?


private final Iterator<Row> rowIterator;
private final int changeTypeIndex;
private final List<Integer> identifierFieldIdx;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the input rows violate the assumption of uniqueness?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strategy I took is to leave them as is.

@flyrain
Copy link
Contributor Author

flyrain commented Feb 21, 2023

Thanks a lot for the review @aokolnychyi. Filed #6898 for your comments.

aokolnychyi pushed a commit that referenced this pull request Feb 22, 2023
This PR is a follow-up change to PR #6344.
krvikash pushed a commit to krvikash/iceberg that referenced this pull request Mar 16, 2023
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 10, 2023
This PR is a follow-up change to PR apache#6344.

(cherry picked from commit 49f3948)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants