-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.3: Introduce the changelog iterator #6344
Conversation
The error is unrelated
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand the core algorithm, but had some basic questions about some of these flags.
|
||
// set the change_type to the same value | ||
deletedRow.update(changeTypeIndex, ""); | ||
insertedRow.update(changeTypeIndex, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit hard to read (modify and recover the values). Could we instead do comparison of deletedRow and insertedRow without the changeTypeIndex? (ie, make the vars immutable/stateless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the recover part. Is it cleaner now?
insertedRow.update(changeTypeIndex, ""); | ||
|
||
if (deletedRow.equals(insertedRow)) { | ||
// remove two carry-over rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is not so clear to me. Im still a bit confused the concept of carry-over, can't find where its defined but I see it some places in your other pr. What about making the comment: "clear cached state"? Unless carry-over has some significance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added Java doc in the class like this to explain what are carry-over rows.
* Carry-over rows are unchanged
* rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the
* copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2,
* data='b') in a data file, if we only delete row2, the COW will copy row1 to a new data file and
* delete the whole old data file. The changelog table will have two delete-rows(row1 and row2), and
* one insert-row(row1). Row1 is a carry-over row.
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestChangelogIterator.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private boolean withinPartition(Row currentRow, Row nextRow) { | ||
for (int i = 0; i < partitionIdx.size(); i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing some context here. The rows have partition values directly? (No need to transform?). And the partition field ids are always the same for all rows?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The partition columns here are identifier columns, which will be provided by the CDC procedure in #6012.
this.cachedRow = RowFactory.create(insertedRow.values()); | ||
} else { | ||
// recover the values of change type | ||
deletedRow.update(changeTypeIndex, DELETE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is markUpdated just the underlying iterator? (Wonder if we can short circuit). Or what is the difference. Not sure I understand the flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed the flag markUpdated
, more details are in #6344 (comment).
@Override | ||
public Row next() { | ||
// if there is a processed cached row, return it directly | ||
if (cachedRow != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may help readability to change this to something like
if (nonUpdate(cachedRow)) {
Row row = cachedRow;
cachedRow = null;
return row
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or is it "if (updateRow"? the negations are confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change, thanks for the suggestion.
} | ||
} | ||
|
||
private boolean withinPartition(Row currentRow, Row nextRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this "sameRow" basically? Since we are basically checking are these two rows are and pre and post update correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This iterator will only be used in CDC procedure, so it is safe to assume that the partition columns here are identifier columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two rows can be a update row or a carry-over row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we ok with the name 'partition'? Wont it be overloaded in Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is overloaded. Here the partition
refer to the one generated by DataFrame::repartition()
. I'm not sure what would be a better name. Open for suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea tbh it threw me off a bit initially. How about just 'equivalent'? And partitionFieldIdx => identifierFieldIdx? Not sure what you guys think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think something like "sameLogicalRow(Row currentRow, Row nextRow)"
if (rowIterator.hasNext()) { | ||
GenericRowWithSchema nextRow = (GenericRowWithSchema) rowIterator.next(); | ||
|
||
if (withinPartition(currentRow, nextRow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would bundle this whole clause into the same helper
If (updateRecord(currentRow, nextRow)) {
}
Thanks @szehon-ho @RussellSpitzer for the review. Resolve all comments. Particularly, I have removed the flag
I think it may make more sense to have different iterators. The PR is only for the case 2. The PR is ready for another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for changes, had some more style/naming suggestion for code clarity for newcomers like me.
|
||
private boolean isCarryoverRecord(GenericInternalRow deletedRow, GenericInternalRow insertedRow) { | ||
// set the change_type to the same value | ||
deletedRow.update(changeTypeIndex, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: I still don't prefer to do a side-effect like this (mutate the variable in the method) just for comparison. Caller may be taken by surprise.
One way is probably to construct deletedRow and insertedRow with the "" values to begin with. But I guess Java will be a bit verbose:
new GenericInternalRow(IntStream.range(0, row.values().length)
.mapToObj(i -> (i == changeTypeIndex) ? "" : row.values()[i])
.toArray())
Another option is to move the update("") back to the base method?
GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values());
deletedRow.update(changeTypeIndex, "");
At least in this case when you read the update() method, it is more clear what the state is without having to scroll to the helper methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just define an equals method that ignores ChangeTypeIndex
return currentRow; | ||
} | ||
|
||
private Row[] update(GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like createUpdateChangelog() to be more descriptive? "update" sounds like it is updating something.
* | ||
* <p>It marks the carry-over rows to null to for filtering out later. Carry-over rows are unchanged | ||
* rows in a snapshot but showed as delete-rows and insert-rows in a changelog table due to the | ||
* copy-on-write(COW) mechanism. For example, there are row1 (id=1, data='a') and row2 (id=2, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for comment. Just wanted to check, why is it not possible for MOR? Data file + position delete of same row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may not be precise to use COW
here. Basically any metadata delete of a data file may have the carry-over rows. I will make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking a bit more, MOR shouldn't have this carry-over rows. MOR will always delete a row which supposed to be deleted due to a delete or update operation. MOR won't delete other unrelated rows like COW does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm.. so if I do appendFile(dataFile) with a certain row, and then I do rowDelta() and add a position delete for that data file? In the same snapshot (I think thats allowed?) Is that considered a carry-over row? Or is that taken care of some other way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider to remove them since nothing really changes, but that's not the scope of this iterator. We haven't started CDC for MOR yet. We probably don't call them carry-over rows since they exist within one snapshot.
} | ||
} | ||
|
||
private boolean withinPartition(Row currentRow, Row nextRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we ok with the name 'partition'? Wont it be overloaded in Iceberg?
@Override | ||
public Row next() { | ||
// if there is an updated cached row, return it directly | ||
if (updated(cachedRow)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about 'cachedUpdateChangelog' ?
return deletedRow.equals(insertedRow); | ||
} | ||
|
||
private boolean updated(Row row) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also one more, variable may make more sense as cachedRow or cachedChangelog. Have to always remember that this method is only for checking the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I was thinking whether or not it should be like cachedUpdateRecord or something like that
So we have "rows" and "records", rows are the input and Records are the output, even though they are all actually "Row" type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
|
||
if (isCarryoverRecord(deletedRow, insertedRow)) { | ||
// set carry-over rows to null for filtering out later | ||
return new Row[] {null, null}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall question, is the null filter in Spark later, just for this case? If so have you considered taking care of this in the iterator, something like (very pseudo):
do {
rows = nextInternal();
} while (row[0] == null and row[1] == null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will filter out the null row by concatenating this iterator with another one like what we did in the test.
Iterators.filter(iterator, Objects::nonNull)
Here is another code example, https://github.com/apache/iceberg/blob/master/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java#L476-L476
I'm fine with either one, but the iterator concatenation would be a nice solution to reduce the complexity of this iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Maybe we can have this iterator just wrap Iterators.filter(...)? If this is kind of a standalone class, not sure if there is anyone is interested in having nulls? wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to concatenate them in the CDC procedure. I don't think anyone would be interested in the nulls. Let me think about how to merge them within a class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a very expensive way to pass back 2 nulls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably just return "null" and do an "if null { currentRow = null , cachedRow = null }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was trying to make it more readable by return 2 nulls. Will make the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A static method is used for concatenating two iterators.
insertedRow.update(changeTypeIndex, UPDATE_AFTER); | ||
|
||
return new Row[] { | ||
RowFactory.create(deletedRow.values()), RowFactory.create(insertedRow.values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we making new rows here? Can we not just use our GenericInternalRows here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GenericInternalRow
is not a subclass of Row
, we need to return a Row
instance here.
private final List<Integer> partitionIdx = Lists.newArrayList(0, 1); | ||
|
||
@Test | ||
public void testUpdatedRows() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more exhaustive tests here. I would probably permute our 4 types of records here.
Update - CarryOver - Insert - Delete
We should make sure this works in any permutation of ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change.
Thanks @szehon-ho and @RussellSpitzer for the review. Resolved all comments. Ready for the another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks cleaner to me now. Added some style/doc comments and maybe wait for Russell to take a look
* <p>It removes the carry-over rows. Carry-over rows are unchanged rows in a snapshot but showed as | ||
* delete-rows and insert-rows in a changelog table due to the copy-on-write(COW) mechanism. For | ||
* example, there are row1 (id=1, data='a') and row2 (id=2, data='b') in a data file, if we only | ||
* delete row2, the COW will copy row1 to a new data file and delete the whole old data file. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIt: 'whole' old file sounds strange, I would just omit it.
} | ||
|
||
public static Iterator<Row> iterator( | ||
Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> partitionIdx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we still change name from partitionIdx to identifierFieldIdx? Be better not to mention partition at all to avoid confusion with real partition fields.
Also a javadoc here on arguments will be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. Thanks for the suggesiton.
@@ -112,11 +112,11 @@ protected List<Object[]> sql(String query, Object... args) { | |||
return rowsToJava(rows); | |||
} | |||
|
|||
protected List<Object[]> rowsToJava(List<Row> rows) { | |||
return rows.stream().map(this::toJava).collect(Collectors.toList()); | |||
public static List<Object[]> rowsToJava(List<Row> rows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: I feel its messy now to have this class now as inherited and a util class. Especially some methods are changed to static and other methods are not, even though they call the static ones.
What do you think? Maybe we can make a separate base class for the helper methods like 'SparkTestHelperBase' and have both SparkTestBase and your test inherit from it (to avoid changing all the tests)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change per suggestion.
Thanks @szehon-ho's suggestion. Made the changes. |
* An iterator that transforms rows from changelog tables within a single Spark task. It assumes | ||
* that rows are sorted by identifier columns and change type. | ||
* | ||
* <p>It removes the carry-over rows. Carry-over rows are unchanged rows in a snapshot but showed as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was working a little on re-wording this a bit? Feel free to use any of this if you like
Carry-over rows are the result of a removal and addition of the same row within an operation because of the copy-on-write mechanism. The iterator uses the "identifier fields" to determine of two rows are identical. For example, given a file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). in a data file, a copy-on-write delete of row2 would require erasing this file. To preserve row1 a new file is written with row1' which is identical to row1. The change-log table would report this as (row1 deleted, row1' added), since this row was not actually modified it is not an actual change in the table. One function of this iterator is to remove these carry-over rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adopted it in a new commit. I'm using insert
instead of add
to be consistent with our change type.
* <p>It removes the carry-over rows. Carry-over rows are the result of a removal and insertion of | ||
* the same row within an operation because of the copy-on-write mechanism. For example, given a | ||
* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of | ||
* row2 would require erasing this file and preserving row1 in a new file written with row1' which |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think its hard to read now, because we use both row1' and row1 (id=1, data='a'). I think either we can just use row, row1' notation, or actual value notation throughout, and be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points. Changed to actual value.
* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of | ||
* row2 would require erasing this file and preserving row1 in a new file written with row1' which | ||
* is identical to row1. The change-log table would report this as (row1 deleted) and (row1' | ||
* inserted), since this row was not actually modified it is not an actual change in the table. The |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last part of the sentence does not seem gramatically correct (it's two sentences without any conjunction). Maybe:
since this row was not actually modified it is not an actual change in the table
=>
despite it not being an actual change to the table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much better than what I had
} | ||
|
||
/** | ||
* Creates a new {@link ChangelogIterator} instance concatenated with the null-removal iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can avoid having so much implementation details, as it may change in future? Also not sure the use of linking to this class itself.
How about something more logical (feel free to elaborate a bit): Creates an iterator for records of changelog table
assertEquals("Rows should match", expectedRows, rowsToJava(result)); | ||
} | ||
|
||
private List<Row> toOriginalRows(RowType rowType, int order) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: for the variable name, maybe index is better than order, for a value that increases in a collection? Order to me seems a property of whole collection.
Thanks @szehon-ho. Made the change per suggestions. |
* | ||
* @param rowIterator the iterator of rows from a changelog table | ||
* @param changeTypeIndex the index of the change type column | ||
* @param identifierFieldIdx the indices of the identifier columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
columns to be considered for row equality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the change
Rebased and resolved the comment. Hi @RussellSpitzer @szehon-ho, would you take another look? |
* file which contains row1 (id=1, data='a') and row2 (id=2, data='b'). A copy-on-write delete of | ||
* row2 would require erasing this file and preserving row1 in a new file. The change-log table | ||
* would report this as (id=1, data='a', op='DELETE') and (id=1, data='a', op='INSERT'), despite it | ||
* not being an actual change to the table. The iterator finds out the carry-over rows and removes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"finds out" -> "finds"
* not being an actual change to the table. The iterator finds out the carry-over rows and removes | ||
* them from the result. | ||
* | ||
* <p>The iterator marks the delete-row and insert-row to be the update-rows. For example, these two |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This iterator also finds delete/insert rows which represent an update and converts them into update records.
?
} | ||
|
||
private boolean isCarryoverRecord(Row currentRow, Row nextRow) { | ||
int length = currentRow.length(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could probably be optimized a bit. I think the easiest thing to do off the bat is just save row length for the whole iterator rather than checking for every row.
In the future I guess you could do something like
ColumnsToCompare = [0,1,2,4,5]
But i'm not sure if that's actually faster. I assume eventually JIT would optimize around the
if (i == changeTypeIndex) {
continue;
}
But skipping the branch is always nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is faster. Tested with a simulation of 100 columns comparison with two options
- Using continue
- Using a int[] to store the index
The later always takes only 60-70% time of the former, which is surprisingly faster. I didn't expected so much difference. I made the change anyway in this class.
Here is my test code.
var left = new ArrayList<Integer>();
var right = new ArrayList<Integer>();
for(int i = 0; i < 100; i++) {
left.add(i);
right.add(i);
}
var start = System.currentTimeMillis();
var match = false;
for(int i = 0; i < 1000000; i++) {
for(int j = 0; j < 100; j++) {
if( j == 50){
continue;
}
match = left.get(j) == right.get(j);
}
}
var end = System.currentTimeMillis();
System.out.println(String.format("duration: %s, match %s", end - start, match));
var indices = new int[99];
int k = 0;
for(int i = 0; i < 100; i++) {
if(i == 50) {
continue;
}
indices[k++] = i;
}
start = System.currentTimeMillis();
for(int i = 0; i < 1000000; i++) {
for(int j : indices) {
match = left.get(j) == right.get(j);
}
}
end = System.currentTimeMillis();
System.out.println(String.format("duration: %s, match: %s", end - start, match));
Here are results:
shell:>loop-perf
duration: 119, match true
duration: 92, match: true
shell:>loop-perf
duration: 138, match true
duration: 86, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 126, match true
duration: 85, match: true
shell:>loop-perf
duration: 124, match true
duration: 85, match: true
shell:>loop-perf
duration: 125, match true
duration: 85, match: true
shell:>loop-perf
duration: 126, match true
duration: 85, match: true
shell:>loop-perf
duration: 124, match true
duration: 85, match: true
} | ||
|
||
private static boolean isColumnSame(Row currentRow, Row nextRow, int idx) { | ||
if (currentRow.isNullAt(idx) && nextRow.isNullAt(idx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could simplify this to
Objects.equals(nextRow.get(idx), currentRow.get(idx))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for Objects.equals
public void testIterator() { | ||
List<Object[]> permutations = Lists.newArrayList(); | ||
// generate 24 permutations | ||
permute( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few last comments, but this is looking good to me. Thanks for adding the permutations that gives me a lot of confidence that the orderings all work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, agree with Russell's suggestion on column comparison method
} | ||
|
||
private static boolean isColumnSame(Row currentRow, Row nextRow, int idx) { | ||
if (currentRow.isNullAt(idx) && nextRow.isNullAt(idx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for Objects.equals
Thanks for the reviews, @RussellSpitzer @szehon-ho ! |
* <li>(id=1, data='b', op='UPDATE_AFTER') | ||
* </ul> | ||
*/ | ||
public class ChangelogIterator implements Iterator<Row>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this class really have to be Serializable
? If I understand correctly, it won't be serialized as it is constructed within a closure called on executors. The closure will be serialized but I don't think the iterator itself will be serialized. If we mark the iterator as serializable, then we have to ensure it is actually serializable. I am not sure it actually is in the current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did another test. You are right, Serializable
isn't needed. I assume the previous error I hit may due to the different parameter types.
private int[] indicesForIdentifySameRow = null; | ||
|
||
private ChangelogIterator( | ||
Iterator<Row> rowIterator, int changeTypeIndex, List<Integer> identifierFieldIdx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: I'd probably pass a schema into this method and make it responsible for computing indices rather than relying on input. This would allow us to compute things like indicesForIdentifySameRow
right away instead of having an if condition for every row. It is optional but it seems safer and would make this class more independent. The downside is the need to compute the indices on each creation but I don't think it matters cause it is done once per Spark partition.
* the same | ||
* @return a new {@link ChangelogIterator} instance concatenated with the null-removal iterator | ||
*/ | ||
public static Iterator<Row> iterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I am not a big fan of this name cause invocations look repetitive ChangelogIterator.iterator()
. I'd consider something similar to what we have in Guava's Iterables
, where the name is inspired by a verb. Something like wrap
, transform
or even create
. This is optional, though. Keep this name if you think it is better.
|
||
Row currentRow = currentRow(); | ||
|
||
if (indicesForIdentifySameRow == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing a schema into the constructor would allow to pre-compute these values right away.
|
||
private Row[] createUpdateChangelog( | ||
GenericRowWithSchema currentRow, GenericRowWithSchema nextRow) { | ||
GenericInternalRow deletedRow = new GenericInternalRow(currentRow.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid introducing InternalRows
just to call update
on it because the internal representation is completely different (e.g. UTF8String
vs String
, int vs timestamp). We can't mix internal and generic rows.
Also, we shouldn't limit ourselves to GenericRowWithSchema
. We can have a more optimal implementation if we encounter it but we can't assume we will always get GenericRowWithSchema
.
deletedRow.update(changeTypeIndex, UPDATE_BEFORE); | ||
insertedRow.update(changeTypeIndex, UPDATE_AFTER); | ||
|
||
return new Row[] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method can be more efficient. We create 4 new rows and an array. Can we modify generic rows in place?
private Row modify(Row row, int valueIndex, Object value) {
if (row instanceof GenericRow) {
GenericRow genericRow = (GenericRow) row;
genericRow.values()[valueIndex] = value;
return genericRow;
} else {
Object[] values = new Object[row.size()];
for (int index = 0; index < row.size(); index++) {
values[index] = row.get(index);
}
values[valueIndex] = value;
return RowFactory.create(values);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way we will also handle all Row
implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @aokolnychyi for the suggestion! Looks much better now!
@flyrain, sorry it took me so long to get to this PR. Great work! I noticed a few things we should follow up on. Could you check? |
|
||
private final Iterator<Row> rowIterator; | ||
private final int changeTypeIndex; | ||
private final List<Integer> identifierFieldIdx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if the input rows violate the assumption of uniqueness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The strategy I took is to leave them as is.
Thanks a lot for the review @aokolnychyi. Filed #6898 for your comments. |
This PR is a follow-up change to PR #6344.
This PR is a follow-up change to PR apache#6344.
(cherry picked from commit 3706825)
This PR is a follow-up change to PR apache#6344. (cherry picked from commit 49f3948)
Introduce an iterator to handle changelog rows within a task. It will be used by the changelog procedure #6012.
cc @aokolnychyi @RussellSpitzer @szehon-ho