-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Read deleted rows with metadata column IS_DELETED #4683
Conversation
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorHolder.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
Do we still need class |
* @param fieldId a column id in this schema | ||
* @return the index of the field in the schema, or -1 if one wasn't found | ||
*/ | ||
public int idToIndex(Integer fieldId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idToPosition
? position
might be more clear.
Also we are using Integer
arg type, which means it can be null. Probably following the pattern from the idToAlias
above. But I am wondering if the fieldId
arg should be primitive int
type. If we stay with Integer
type, we probably need null check here.
Also, should the return type be Integer
and nullable just to be consistent with other methods in this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the name suggestion. There are always performance concerns about non-primitive type. I'm kind of OK with both primitive type and non-primitive type here. In terms of consistency, these two public methods also return int
. I will leave the return type to int
unless we have a strong reason to use Integer.
public int schemaId()
public int highestFieldId()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even this place uses int
, I'd prefer to use int
for the parameter as well. It is unnecessary to do this boxing and unboxing.
public int fieldId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even this place uses
int
, I'd prefer to useint
for the parameter as well. It is unnecessary to do this boxing and unboxing.
public int fieldId() {
That is different. For NestedField, fieldId will never be null. hence it returns primitive int, which makes sense. But here, we may not find the field with the provided id, we are returning a special value -1
. If we follow the style of other find methods in the Schema
class, we can see aliasToId
returns null if no matching field is found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment. With @szehon-ho's suggestion #4683 (comment), I removed the the new method.
@@ -94,6 +95,12 @@ protected DeleteFilter(String filePath, List<DeleteFile> deletes, Schema tableSc | |||
this.eqDeletes = eqDeleteBuilder.build(); | |||
this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); | |||
this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); | |||
this.hasColumnIsDeleted = requestedSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I saw a lot of places in Iceberg just use null
to indicate the condition. should we use null for columnIsDeletedIndex
too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A boolean variable should be expressive.
@@ -227,6 +237,39 @@ public void close() { | |||
} | |||
} | |||
|
|||
private static class PositionStreamDeleteMarker<T> extends PositionStreamDeleteFilter<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as mentioned in the draft PR, this is not actually a filter as it always returns true. It is only to use the iteration part of the filter. Maybe use CloseableIterable#transform
instead for iteration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I've replaced filter
with transform
for all places except this one. This is trickier to change. Let me think a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Do you think it will be cleaner to have most of the logic at:
abstract class PositionStreamDeleteIterable {
CloseableIterator createPosDeleteIterator(CloseableIterator<T> items);
}
and have the two concrete subclass (PositionStreamDeleteMarker and PositionStreamDeleteFilter) extend it?
I think having the Marker extend the Filter still seems a bit strange, though the logic is correctly refactored now.
@flyrain , I agree the implementation of |
We don't plan for that utilities in this PR. Let's not change anything about |
|
||
PositionSetDeleteFilter<T> filter = new PositionSetDeleteFilter<>(rowToPosition, deleteSet); | ||
public static <T> CloseableIterable<T> filter(CloseableIterable<T> rows, Predicate<T> shouldKeep) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new filter method API. only concern is for compatibility. this is a public static method from a public class. Probably very few users call this API directly. if we want to be safe, we can keep the old API and mark it as deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. I had the same concern. However, we've discussed what should be considered as APIs in one of the community sync. we agreed that only consider the public things in the API module as APIs. cc @rdblue. With that, it should be OK to change this public method?
...k/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
Outdated
Show resolved
Hide resolved
...2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/IcebergArrowColumnVector.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
Let me take another look too. Sorry for the delay. |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
Show resolved
Hide resolved
keep = false; | ||
} | ||
@Override | ||
protected CloseableIterator createPosDeleteIterator(CloseableIterator<T> items) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: raw type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say it's a bit more than a nit! Iceberg PRs should always use the type system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it in a new commit.
return isDeleted; | ||
} | ||
|
||
protected abstract CloseableIterator createPosDeleteIterator(CloseableIterator<T> items); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: raw type
} catch (IOException e) { | ||
throw new UncheckedIOException("Failed to close delete positions iterator", e); | ||
@Override | ||
protected CloseableIterator createPosDeleteIterator(CloseableIterator<T> items) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: raw type
return isDeleted; | ||
} | ||
|
||
protected abstract CloseableIterator createPosDeleteIterator(CloseableIterator<T> items); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really create a position delete iterator? Don't we iterate over records here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do iterate the pos delete records, for example, line 191, this.nextDeletePos = deletePosIterator.next();
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do iterate over position deletes but I think this iterator is for remaining data records, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline with @aokolnychyi, I've changed the name to applyDelete
@@ -67,6 +66,8 @@ | |||
private final List<DeleteFile> eqDeletes; | |||
private final Schema requiredSchema; | |||
private final Accessor<StructLike> posAccessor; | |||
private final boolean hasColumnIsDeleted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: hasIsDeletedColumn
?
@@ -67,6 +66,8 @@ | |||
private final List<DeleteFile> eqDeletes; | |||
private final Schema requiredSchema; | |||
private final Accessor<StructLike> posAccessor; | |||
private final boolean hasColumnIsDeleted; | |||
private final int columnIsDeletedPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isDeletedColumnPosition
?
this.columnIsDeletedPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); | ||
} | ||
|
||
protected int columnIsDeletedPosition() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: isDeletedColumnPosition()
?
Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(filePath, deletes)); | ||
} | ||
|
||
private CloseableIterable<T> createDeleteIterable(CloseableIterable<T> records, Predicate<T> isDeleted) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a better name? Is it an iterable of remaining rows?
This looks close to me. I'd switch to an explicit call to set the |
this.deletePosIterator = deletePositions; | ||
// consume delete positions until the next is past the current position | ||
boolean isDeleted = currentPos == nextDeletePos; | ||
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) { | ||
this.nextDeletePos = deletePosIterator.next(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now the only place in the code which refers to it as "this.nextDeletePos"
protected PositionFilterIterator(CloseableIterator<T> items, CloseableIterator<Long> deletePositions) { | ||
super(items); | ||
this.deletePosIterator = deletePositions; | ||
// consume delete positions until the next is past the current position |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is any simpler but we can remove one if statement, not sure if this is more clear.
// Consume nextDeletePos till past currentPos, if currentPos equals any consumed nextDeletePos the current row has been deleted
boolean isDeleted = currentPos == nextDeletePos;
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
isDeleted |= currentPos == nextDeletePos
}
return isDeleted;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with either one. I kind of think if
is easier to read. If we will make it simpler, here is another way, which we don't have to check on isDeleted
. What do you think?
while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
this.nextDeletePos = deletePosIterator.next();
if (currentPos == nextDeletePos) {
isDeleted = true;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep it as-is to avoid last-minute changes in this tricky place.
} | ||
|
||
@Test | ||
public void testIsDeletedColumnWithoutDeleteFile() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test to project is_deleted column when there is no delete file.
Thanks, @flyrain! Great to have this done. Thanks everyone who reviewed! |
Thank @aokolnychyi! Thanks everyone for the review. |
Per the discussion in #4539, created this new PR. With the change in #2538 and this PR, we can read deleted rows from row-level deletes.
Please note that there is no IS_DELETED column filter push down at this moment. This will be done as a followup PR.Think a bit more, we may not need pushdown, especially for non-vectorized read. It shouldn't have any difference between Spark filters out rows or Iceberg filters out rows. We iterate through all rows anyway.cc @aokolnychyi @RussellSpitzer @chenjunjiedada @stevenzwu @Reo-LEI @hameizi @singhpk234 @rajarshisarkar @kbendick @rdblue