-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.4: Support distributed planning #8123
Spark 3.4: Support distributed planning #8123
Conversation
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName(); | ||
|
||
public static final String DELETE_PLANNING_MODE = "read.delete.planning-mode"; | ||
public static final String DELETE_PLANNING_MODE_DEFAULT = PlanningMode.LOCAL.modeName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am debating whether these configs make sense at the table level. Using LOCAL
for deletes by default as those are less likely to be filtered using min/max filters and we will need to load stats so the cost of bringing them to the driver will be higher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it does make sense to have these as table config because this is dependent on the table structure. If a table has a ton of deletes, you'd probably want to set this to AUTO to override. I'm not sure whether we'd want this to also be a default in Spark though. Seems like we may want to override, but not actually get the default value from here.
...3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/DistributedPlanningBenchmark.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/ManifestUtil.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
protected List<DataFile> planDataRemotely(List<ManifestFile> manifests) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial though was to query the data_files
metadata table. However, that faced a number of issues.
- Need to translate a predicate on the main table into a predicate on the metadata table.
- Need to implement Iceberg to Spark filter conversion.
- Hard to populate scan metrics.
It was also expensive as DataFile
is wrapped as GenericInternalRow
, then converted to UnsafeRow
, then bytes are collected to the driver, deserialized on the driver, then converted to public Row
, then wrapped into DataFile
again. Not to mention the required complexity.
Right now, I am using RDD
so there is only one round of serialization (either Java or Kryo) and no conversion. The current approach performs exceptionally well for selective queries. The cost to serialize the entire content of a 10 MB manifest is around 0.2-0.3 s and is not an issue. The full table scan performance depends on how quickly the driver can fetch the result from other nodes. The most critical part is the size of serialized data.
Option 1: Java serialization of DataFile
and DeleteFile
.
Option 2: Kryo serialization of DataFile
and DeleteFile
.
Option 3: Converting DataFile
and DeleteFile
to UnsafeRow
and using Java/Kryo serialization on top.
Option 2 (serializing files with Kryo) produced around 15% smaller chunks compared to Option 1 (serializing files with Java) and did not require any extra logic to convert to and from UnsafeRow
. Option 3 does not require Kryo to be efficient but requires the conversion logic and gave only 3-5% size reduction in size compared to Option 2. I was surprised how well Kryo worked on top of files but I still debate whether conversion to UnsafeRow
makes sense to not depend on Kryo. That said, using UnsafeRow
will yield only marginally smaller chunks. The cost of the serialization is not an issue, like I said earlier.
The most efficient approach was to implement a custom Kryo serializer for DataFile[]
and apply dictionary encoding for partitions. That gave 15% size reduction when a big number of files belonged to the same partition but is extremely complicated and required the user to register a custom serializer.
All in all, if someone expects to query 20 PB in one job, I doubt 30 seconds to plan the job will be their biggest issue. That's why I opted for the simplest solution that works really well for the first two use cases mentioned in the PR description (still faster in the third use case but not drastically). If we want to support such use cases, we need to change how Spark plans jobs and not collect data files back to the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another choice is that we could customize the DataFile/DeleteFile
serialization/deserialization with readObject() and writeObject()
if we don't want to depend on Kryo. The customized implementation should provide a better performance as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, both the Java and Kryo serialization can be further improved but it won't be drastic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not depend on Kryo, but good to know that it works with both Kryo and Java serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we have two options:
- Look into optimizing Java serialization for
DataFile
/DeleteFile
to bridge that 15% gap in size. - Develop a conversion utility to and from
UnsafeRow
, which could potentially be used in metadata scans.
For instance, UnsafeRow
implements both Externalizable
and KryoSerializable
to customize both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea can be to have an extra data structure per a group of data files so that we would serialize each partition info only once vs serializing the same partition tuple for many data files. That said, it is only applicable to full table scans and I am not sure is worth the complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For instance, UnsafeRow implements both Externalizable and KryoSerializable to customize both.
Do we still need the customized serializer if we use UnsafeRow
? From my understanding, the object to/from UnsafeRow should already implement serializing/deserializing. Or maybe something I misunderstood.
I am inclined to try out optimizing Java serialization for GenericDataFile first but I can be convinced otherwise.
This is indeed a good direction to try. Maybe benefits for other engines as well?
Another idea can be to have an extra data structure per a group of data files so that we would serialize each partition info only once vs serializing the same partition tuple for many data files. That said, it is only applicable to full table scans and I am not sure is worth the complexity.
Is the reason for this because the biggest overhead of serialization and deserialization comes from partition data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need the customized serializer if we use UnsafeRow?
No, I meant UnsafeRow
should be efficiently handled by both Java and Kryo serialization as it provides custom logic for both.
Is the reason for this because the biggest overhead of serialization and deserialization comes from partition data?
It is a substantial part but does not dominate. Based on my tests, it was around 15%. It is the only repetitive part, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is that the DataFile and DeleteFile instance that we use is actually an Avro record with an Avro schema. I think you could directly use Avro serialization, which would save quite a bit of overhead and wouldn't require custom Java serialization. Check out IcebergEncoder
and IcebergDecoder
for the efficient code to produce byte[].
573adb7
to
d55167d
Compare
d55167d
to
3f63b78
Compare
spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
String modeName = | ||
confParser | ||
.stringConf() | ||
.sessionConf(SparkSQLProperties.PLANNING_MODE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall add an options parameter as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, why not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a use case for it, or is this speculative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly speculative I guess if a query touches multiple tables but we want to override only in one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that provides the ability for fine controls when there are multiple tables.
spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
return CloseableIterable.transform( | ||
dataFiles, | ||
dataFile -> { | ||
DeleteFile[] deleteFiles = deletes.forDataFile(dataFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this step could be done in executor that would be amazing. We have noticed some v2 tables blocking here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you, please, provide details about this use case stats? Size and number of data/delete files, type of delete files, compaction, etc. Where does it block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aokolnychyi, such as the following. The data is ingested into Iceberg with a Flink CDC job, and the following is a full table rewrite job for that.
ScanMetricsResult{totalPlanningDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT45M35.528362027S, count=1}, resultDataFiles=CounterResult{unit=COUNT, value=4241121}, resultDeleteFiles=CounterResult{unit=COUNT, value=21491991}, totalDataManifests=CounterResult{unit=COUNT, value=138}, totalDeleteManifests=CounterResult{unit=COUNT, value=57}, scannedDataManifests=CounterResult{unit=COUNT, value=138}, skippedDataManifests=CounterResult{unit=COUNT, value=0}, totalFileSizeInBytes=CounterResult{unit=BYTES, value=70668583180805}, totalDeleteFileSizeInBytes=CounterResult{unit=BYTES, value=361014841132536}, skippedDataFiles=CounterResult{unit=COUNT, value=29}, skippedDeleteFiles=CounterResult{unit=COUNT, value=0}, scannedDeleteManifests=CounterResult{unit=COUNT, value=57}, skippedDeleteManifests=CounterResult{unit=COUNT, value=0}, indexedDeleteFiles=CounterResult{unit=COUNT, value=4181955}, equalityDeleteFiles=CounterResult{unit=COUNT, value=4181955}, positionalDeleteFiles=CounterResult{unit=COUNT, value=0}, skippedMetaBlobs=null}}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a lot of equality deletes (4 million data files vs 21 million delete files, 350 TB of deletes to apply). Since we can't compact equality deletes across sequence numbers, would this use case benefit from a regular job that would convert equality deletes into position deletes? Is there a time interval where that would be possible on a regular basis to avoid rewriting the entire set of data files? @szehon-ho and I had a few discussions on how to build that. How quickly do you accumulate 21 million delete files?
Have you profiled it? Is most of the time spent on looking up the index? I wonder whether using an external system to hold the delete index would also make sense (which could be populated in a distributed manner).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anyone have any other thoughts on this or faced similar issues?
cc @RussellSpitzer @stevenzwu @jackye1995 @amogh-jahagirdar @rdblue @szehon-ho
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, we can do something for partition scoped deletes but may not be that easy if we have global deletes or spec evolution. One alternative that I quickly discarded was to wait until deletes are planned and then broadcast the delete index. I feel that would be a bad decision as the size of that index may not be trivial and there would be more back and forth communication and no way to plan data and deletes concurrently.
I will explore this a bit more. I am not yet convinced using a thread pool on the driver to look up deletes would not be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is something we should skip for now. It's extremely helpful to take this step forward, without worrying about problems introduced by tables that are using a write strategy like Flink, where basically all work is deferred until later, and then is never done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will measure the time it takes to perform this step on a table in a reasonable state. If it is not that bad, I'll probably skip for now. If it is a substantial bottleneck, we can consider a distributed assignment of deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After all optimizations for local delete assignment that went in, it is fairly cheap to assign position deletes. It is more expensive for equality deletes. I'd probably go with local assignment for now until we figure out the final vision for assignment of deletes.
when it comes to |
Thanks @aokolnychyi for the awesome work. I think there are many people expect this. |
@zinking, I know that paper and there are a few ideas in it that may be applicable to us. At the same time, Iceberg metadata already forms a system table which we query in a distributed manner ( At this point, we are not storing large blobs in the manifests so we will come back to that paper while discussing how to support secondary indexes and how to integrate that into planning. That would be the point when the metadata would be too big. Right now, we are talking about 2-4 GB of metadata to cover 20-40 PB of data. |
that is exactly what I am talking about. I am ok with status quo. |
Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId); | ||
return newRefinedScan(table(), snapshotSchema, context().useSnapshotId(scanSnapshotId)); | ||
protected boolean useSnapshotSchema() { | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
big difference from the previous implementation here? I assume i'll see later that you moved the validation out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation is in the parent so that I can reuse it in both scans now.
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
+ manifest.deletedFilesCount(); | ||
} | ||
|
||
private ManifestEvaluator newManifestEvaluator(PartitionSpec spec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like these are primarily based on ManifestEvaluator
. I wonder if a shared util method would be helpful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just that this had quite a bit of copied code. I was wondering if we should share it between here and ManifestGroup
. But my comment was confusing since I said ManifestEvaluator
instead of ManifestGroup
.
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
3f63b78
to
3531f0d
Compare
I have spent all this time profiling our local planning and |
core/src/main/java/org/apache/iceberg/DistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
spark/v3.4/spark/src/main/java/org/apache/iceberg/SparkDistributedDataBatchScan.java
Outdated
Show resolved
Hide resolved
3531f0d
to
530ddf1
Compare
530ddf1
to
de4ccd9
Compare
1f92020
to
86f62f2
Compare
import org.apache.spark.broadcast.Broadcast; | ||
import org.apache.spark.sql.SparkSession; | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would eventually be part of the public docs.
6380235
to
407c001
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
* | ||
* @return an iterable of file groups | ||
*/ | ||
public Iterable<CloseableIterable<DataFile>> fileGroups() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this called fileGroups
? It looks like it produces the file from every entry, not a group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I called it fileGroups
because instead of combining entries from all manifests into a single iterable, it returns an iterable of iterables where each element represents content of one manifest. Let me know if that makes sense.
* | ||
* <p>Note that this class is evolving and is subject to change even in minor releases. | ||
*/ | ||
abstract class DistributedDataBatchScan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be called BaseDistributedDataScan
? Here's my rationale:
- Add
Base
because this must be extended - Remove
Batch
because this is new and we aren't adding any more non-batch scan classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, changed.
private boolean shouldCopyDataFiles(boolean planDataLocally, boolean loadColumnStats) { | ||
return planDataLocally | ||
|| shouldCopyRemotelyPlannedDataFiles() | ||
|| (loadColumnStats && !shouldReturnColumnStats()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand why the column stats matter in this last case. Can you explain the rationale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assumption is that keeping stats in memory is expensive. When we load stats for equality deletes but don't need to return them, we can drop them upon assigning deletes so that they can be garbage collected. That way, we less likely to hit OOM.
It is similar to what we do in local planning. Does it make sense?
TestSparkDistributedDataBatchScan.spark = | ||
SparkSession.builder() | ||
.master("local[2]") | ||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to test with Java serialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added separate suites for both. It was harder to parameterize cause the Spark session is static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks for getting this ready!
407c001
to
507ba0f
Compare
507ba0f
to
4222990
Compare
Thanks a lot everyone for reviewing, I merged this change as it got pretty large with all the tests. I'll follow up with a few minor things like benchmarks. |
This PR adds support for distributed planning to cover the following use cases on top of tables with tens of millions of files:
Iceberg planning uses manifest partition info to prune entire manifests, which allows the format to plan queries with partition filters blazingly fast even when the table contains 50+ million files (as long as the metadata is properly clustered, which can be achieved by calling the rewrite manifests action). At the same time, use cases mentioned above may benefit from distributed planning as the cluster parallelism can be much higher than the number of cores on the driver.
This logic has been tested on a table with 20 million files (400+ manifests) and enabled planning a scan with min/max filter (no partition predicate) within 3 seconds (compared to 30 s (24 driver cores) vs 1.5 m (4 driver cores)). Full table scans also see an improvement (35-50%) but the cost of bringing the result to the driver becomes a bottleneck. Queries with partition predicates can be planned in around 1 second even with 20+ million files (thanks to manifest filtering). Such a number of files can easily cover from 10 to 40 PB of data in a single table.