-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Apply row-level delete files when reading #1444
Conversation
FYI @openinx, @JingsongLi, @prodeezy, @rymurr |
import org.apache.iceberg.io.FileIO; | ||
import org.apache.iceberg.io.InputFile; | ||
|
||
public class GenericDeleteFilter extends DeleteFilter<Record> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shardulm94, @rdsr, this is a separate public class so that we can use it in IcebergInputFormat
. It should be fairly easy to apply the deletes when using generics to read.
case ORC: | ||
default: | ||
throw new UnsupportedOperationException(String.format( | ||
"Cannot read %s file: %s", deleteFile.format().name(), deleteFile.path())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Cannot read %s delete file: %s
Just to note that the problem isn't that the file can't be read, but that the file type is not supported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to "Cannot read deletes: %s is not a supported format: %s"
} | ||
|
||
public CloseableIterable<T> filter(CloseableIterable<T> records) { | ||
return applyEqDeletes(applyPosDeletes(records)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there anything important about the order of applying deletes here? Is the guess here that there will be more Pos Deletes than EqDeletes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought here is just that EqDelete always uses a set check so it's probably going to be cheaper than the possibility that you have to do the streaming check in pos deletes, but maybe I'm thinking about it wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to do minor compaction to transform the equality delete files to position delete files, so I guess position deletes should be more than equality deletes as time goes on.
The records in position deletes are ordered, so the sorted merge-base check wouldn't be more expensive than set based check consider the cost of building the hash set. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that equality deletes are more expensive to apply because they require a projection and a set lookup (hash and maybe equality check), and there could be multiple equality deletes to apply. So the idea is to do the cheapest operation first and the most expensive operation last to do fewer expensive filter checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do minor compaction to transform the equality delete files to position delete files
@chenjunjiedada do we really need to transform equality-deletes to positional-deletes when doing minor compaction ? Let's take the case in a single bucket:
txn-0: insert-file0, pos-delete-file0, equality-delete-file0;
txn-1: insert-file1, pos-delete-file1, equality-delete-file1;
txn-2: insert-file2, pos-delete-file2, equality-delete-file2;
The insert-file0's posDeletes is [pos-delete-file0, pos-delete-file1, pos-delete-file2]
and eqDeletes is [equality-delete-file0, equality-delete-file1, equality-delete-file2]
.
The insert-file1's posDeletes is [pos-delete-file1, pos-delete-file2]
and eqDeletes is [equality-delete-file1, equality-delete-file2]
The insert-file2's posDeletes is [pos-delete-file2]
and eqDeletes is [equality-delete-file2]
You mean we will transform the equality-delete-file2 and equality-delete-file1 to a new pos-delete-file3 when doing minior compaciton for txn2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx , Since the cost of merging equality delete file is bigger than merging position delete, so it could be an option for minor compaction. While I haven't think about how we will do that compaction. I guess we should consider the sequence number when compacting the equality delete file, for example, convert equality delete files with same sequence number to a new position delete file.
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
import org.apache.parquet.Preconditions; | ||
|
||
public abstract class DeleteFilter<T> { | ||
private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a table property? So that user could tune according to the executor memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also add, since this is based on reader's memory constraints, shouldn't this also be a reader (datasource option) property passed down the scan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, eventually. I just want to keep these commits small and more focused. We can add more plumbing for config in parallel.
StructLikeSet expected = rowSetWithoutIds(29, 89, 122); | ||
StructLikeSet actual = rowSet(table, "id"); // data is added by the reader to apply the eq deletes | ||
StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id"); | ||
// data is added by the reader to apply the eq deletes, use StructProjection to remove it from comparison | ||
StructLikeSet actual = selectColumns(rowSet(table, "id"), "id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason why we changed this test to remove "data" from comparison?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That column is not in the requested projection.
In Spark, the column is not in the returned row, so I had to add selectColumns
to remove it from the expected rows. After doing that, I realized that this test was validating both id
and data
, when the only column it should be validating for correctness is id
. While we get the same result, the test was more specific than it needed to be. If we were to remove data
from the rows produced by the scan, this test would have broken.
|
||
public abstract class DeleteFilter<T> { | ||
private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L; | ||
private static final Schema POS_DELETE_SCHEMA = new Schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: I saw many other classes also defined their own POS_DELETE_SCHEMA, is it possible to move it to a common class , similar to the MetadataColumn ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The schema is not fixed because it can contain data rows. This is the projection schema used to read that ignores any row data and it doesn't need to be shared right now. I think this should be the only place where we need this, outside of tests.
requiredIds.addAll(eqDelete.equalityFieldIds()); | ||
} | ||
|
||
Set<Integer> missingIds = Sets.newLinkedHashSet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there's no need to create another new LinkedHashSet ? we won't modify this missingIds set, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly to avoid diffing the sets twice, once in isEmpty
and once to iterate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that make sense.
records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file()); | ||
records = applyEqDeletes(records, fileProjection, eqDeletes, task.file()); | ||
records = applyResidual(records, fileProjection, task.residual()); | ||
CloseableIterable<Record> records = openFile(task, readSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: since we've added few columns which may not be included in projection
schema in the readScheme
, then Will the record from iterator may have more columns than user expected ?
For example, table test=(a,b,c), the user query the data by select b from test where b >10
, while we use a
as the equality field id set for equality delete files, this CloseableIterable<Record>
will return records with column (a,b)
, while users would actually expected return records with column (b)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the records will have more columns than requested. That's why we project the added columns at the end of the record, (b, a)
in your example. @shardulm94 pointed this out on the original PR for generics.
For Spark, this is okay because Spark ignores the extra columns if the schema associated with a row doesn't have them. You can see the check here: https://github.com/apache/iceberg/pull/1444/files#diff-7600f4d25cfdef7f5da70e12126b55c7R137-R138
For generics, we just return the larger row to avoid needing to make a copy right now. I don't think it is worth the cost of a copy to remove the columns, so we would need to add the ability to truncate the columns of a GenericRecord
. I think that adding that feature to GenericRecord
should be done in a separate PR, if we decide that it should be done.
|
||
Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes, | ||
delete -> openDeletes(delete, deleteSchema)); | ||
StructLikeSet deleteSet = Deletes.toEqualitySet( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: do we need to consider to maintain the deleteSet in a LRU cache if several FileScanTask are located in the same task node in future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reusing the delete set is a good future optimization. We will need to be careful with that, though. I wouldn't want to keep them around any longer than needed because the set could be fairly large. For Spark, we would not want to keep these sets across tasks and may even want to discard sets as they are no longer needed.
Let me do a pass right now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great to me.
I think it would be essential to cache overlapping deletes in a task for reasonable performance. Right now, we open/scan every delete file for each data file. For example, if we have data files F1 (D1, D2), F2 (D1), F3(D1), we will read D1 3 times. Ideally, we should detect this and cache that delete file.
@rdblue, do we plan to also add the merge fallback for equality deletes once we have sort order ids assigned to data and delete files? What if the sort order does not match? Do we plan to resort records locally or simply assume it must fit into memory?
Yes, I think once we have delete and data file metadata tracking the sort order of a file, we should add the merge optimization for equality deletes. As for fallback, I'm reluctant to provide tools in Iceberg itself for cases where deletes are larger than available memory and must be sorted. Implementing a sort that will spill to disk is an area where we expect processing engines to be much better than Iceberg, so I would hope that we can defer that implementation to the processing engines. This is also an area where a table should be maintained. If sort orders don't match then data or deletes can be rewritten to compact, convert to position deletes, or resort to make the application of deletes mergeable. In the short term, I suspect this will not be a problem in practice because tables will be maintained. We can revisit the fallback case if we need it. |
Thanks for the reviews, everyone! I've merged this. Now we should be unblocked to add support to MR and Flink using the |
Schema deleteSchema = TypeUtil.select(requiredSchema, ids); | ||
|
||
// a projection to select and reorder fields of the file schema to match the delete rows | ||
StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: I saw the StructProjection
saying it does not support list or map, so that means we don't support that there's any list or map in equality fields ? For now I think it's OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. We should extend it later, but I don't think that many deletes will be by list or map.
|
||
@Override | ||
protected StructLike asStructLike(Record record) { | ||
return asStructLike.wrap(record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reconsidered this again, I saw argument record
is actually read from the requiredSchema
, do we need to wrap this record by the same requiredSchema
again ? Pls see https://github.com/apache/iceberg/pull/1444/files#diff-a8a025276b1d93b0830f2ee6c91118efR76.
If the record is actually matching the requiredSchema
, then we could just return the record
in asStructLike
method ? Also we don't have to overwrite pos
method in this classes again ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the wrapper here is to translate from Iceberg's generic to the internal representation for values. For example, generics will pass timestamptz as an OffsetDateTime
to users, but internally Iceberg uses microseconds from epoch as a long.
This applies row-level delete files while reading in Spark, and refactors the generic read code to share the filter code between Spark and generics.
GenericReader
toDeleteFilter
SparkDeleteFilter
to adaptInternalRow
toStructLike
for deletesGenericDeleteFilter
for generic rows, which can be reused forIcebergInputFormat