-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API, Core, Spark 3.5: Parallelize reading of deletes and cache them on executors #8755
Conversation
cbb20b6
to
5ec9097
Compare
new ConfigEntry<>( | ||
"iceberg.worker.delete-num-threads", | ||
"ICEBERG_WORKER_DELETE_NUM_THREADS", | ||
4 * Runtime.getRuntime().availableProcessors(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value may sound ridiculous but here is my thought process: there is one such thread pool per JVM, each core in an executor can get a data task that may need to load 1 to many delete files, these tasks are I/O intensive. This value essentially means we can try to load 4 delete files concurrently per each data task. The cache is also blocking to prevent reading the same files twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is really no good way to pick the default for this since
Thread_pool_size = fn(cores, io_wait_time/compute_time)
, and your guess is as good as anyone else's whether 4
is a good number for the environment the code is going to run on.
Assuming io_wait time/compute_time == 5
, a factor of 4 above would give you a utilization of 80% which sounds pretty good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this end user configurable? If not then it probably needs to be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is configurable, we just need to make sure the default value is reasonable.
@@ -236,6 +236,8 @@ private TableProperties() {} | |||
public static final String DELETE_PLANNING_MODE = "read.delete-planning-mode"; | |||
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName(); | |||
|
|||
public static final String SPARK_EXECUTOR_CACHE_ENABLED = "read.spark.executor-cache.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to discuss how to enable/disable and configure this cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passed in as a hadoop conf property, or a catalog property since we want this to be end user configurable? So probably not as part of table properties?
@@ -125,6 +126,25 @@ public static StructLikeSet toEqualitySet( | |||
} | |||
} | |||
|
|||
public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike toPositionIndex
below, this one does not filter the deletes for a particular data file. Instead, it builds an index for each referenced data file and returns a map. It useful when the entire delete file can be cached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems useful as a javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
Outdated
Show resolved
Hide resolved
* @return an {@link ExecutorService} that uses the delete worker pool | ||
* @see SystemConfigs#DELETE_WORKER_THREAD_POOL_SIZE | ||
*/ | ||
public static ExecutorService getDeleteWorkerPool() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't usually use get
but we have getWorkerPool()
above so I matched it for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have to. And maybe will need to pass in the configured thread pool size as an argument.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class BaseDeleteLoader implements DeleteLoader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this abstraction cause it was too much to put into DeleteFilter
. Take a look at the parent interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I questioned this a bit and do believe it is better to have this logic separately from DeleteFilter
. That said, the exact API and whether it should be a top-level class are still open questions.
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
Outdated
Show resolved
Hide resolved
return createDeleteIterable(records, isDeleted); | ||
} | ||
|
||
return hasIsDeletedColumn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a separate discussion but I think we should drop streaming position deletes. We originally added them when we used a set, not bitmaps.
import org.apache.iceberg.deletes.PositionDeleteIndex; | ||
import org.apache.iceberg.util.StructLikeSet; | ||
|
||
public interface DeleteLoader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here.
Should we apply some intelligence on how we are distributing the tasks so that we could utilize the max from the executor cache ? For ex : lets say we could prefer sending those set of data files which have a lot of overlapping delete files or may be belong to some partition (for ex : position deletes) ? |
@singhpk234, I have a follow-up change to do that. Unfortunately, it is a bit controversial. There is no way to express task affinity in Spark, only locality. The best option for us is to implement what |
Thanks @aokolnychyi looking forward to it :) ! |
I tested this PR on a cluster a bit. It would be nice if someone could also play around with it in their environment. |
new ConfigEntry<>( | ||
"iceberg.worker.delete-num-threads", | ||
"ICEBERG_WORKER_DELETE_NUM_THREADS", | ||
4 * Runtime.getRuntime().availableProcessors(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is really no good way to pick the default for this since
Thread_pool_size = fn(cores, io_wait_time/compute_time)
, and your guess is as good as anyone else's whether 4
is a good number for the environment the code is going to run on.
Assuming io_wait time/compute_time == 5
, a factor of 4 above would give you a utilization of 80% which sounds pretty good.
@@ -236,6 +236,8 @@ private TableProperties() {} | |||
public static final String DELETE_PLANNING_MODE = "read.delete-planning-mode"; | |||
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName(); | |||
|
|||
public static final String SPARK_EXECUTOR_CACHE_ENABLED = "read.spark.executor-cache.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passed in as a hadoop conf property, or a catalog property since we want this to be end user configurable? So probably not as part of table properties?
new ConfigEntry<>( | ||
"iceberg.worker.delete-num-threads", | ||
"ICEBERG_WORKER_DELETE_NUM_THREADS", | ||
4 * Runtime.getRuntime().availableProcessors(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this end user configurable? If not then it probably needs to be.
public static final int DELETE_WORKER_THREAD_POOL_SIZE = | ||
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); | ||
|
||
private static final ExecutorService DELETE_WORKER_POOL = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the size of the thread pool is end user configurable this will not work. But you could initialize the thread pool lazily in getDeleteWorkerPool()
and presumably there will be some way to read the end user configured value at that point.
* @return an {@link ExecutorService} that uses the delete worker pool | ||
* @see SystemConfigs#DELETE_WORKER_THREAD_POOL_SIZE | ||
*/ | ||
public static ExecutorService getDeleteWorkerPool() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might have to. And maybe will need to pass in the configured thread pool size as an argument.
data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
Outdated
Show resolved
Hide resolved
5ec9097
to
30802cc
Compare
@@ -452,6 +454,59 @@ private static void checkSchemaCompatibility( | |||
} | |||
} | |||
|
|||
public static long defaultSize(Types.NestedField field) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have 2 options to limit the size of the cache:
- Add ways to limit the size of equality and position delete files to be loaded.
- Add ways to indicate the maximum cache size in bytes.
The first one is easy but we need to account that delete files may store extra columns and it will be hard to configure these options correctly cause users have no idea how much the cached representation will actually occupy. That's why I went with the estimation approach and let users configure the max cache size in bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to cache the extra columns? We wouldn't be using them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't cache them. Only equality columns. However, fileSizeInBytes
includes the total size, not the projection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to understand, user configures size of equality deletes (but based on whole row sizes), but position deletes based on 2* record count? Would it be easier to we consider having two configured cache sizes (position delete , eq delete cache sizes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to ask the user for the total size of the cache in memory and estimate the actual size to honor those values. For equality deletes, we will rely on the number of records and types of equality columns.
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
Outdated
Show resolved
Hide resolved
*/ | ||
public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE = | ||
new ConfigEntry<>( | ||
"iceberg.worker.delete-num-threads", | ||
"ICEBERG_WORKER_DELETE_NUM_THREADS", | ||
Math.max(2, Runtime.getRuntime().availableProcessors()), | ||
Math.max(2, 4 * Runtime.getRuntime().availableProcessors()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a huge deal but we are avoiding the RevCheck here by putting our multiplier in a constant here. We should probably move the 4 into a field so future modifications trigger the Rev checker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can do that but is it something we want to expose to others? Would the goal be to bring the attention or to prohibit future modifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can keep it private, it's to prohibit future mods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we keep it private, it won't break revapi. I am not sure about a public one. I'll check other places we have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't do that for other properties as well.
I'd be open to explore that but for all properties in a separate PR.
@@ -27,6 +27,15 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex { | |||
roaring64Bitmap = new Roaring64Bitmap(); | |||
} | |||
|
|||
void merge(PositionDeleteIndex other) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just only allow BitmapPositionDeleteIndex here? Do we not have the type when we call merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also EmptyPositionDeleteIndex
. I actually started with BitmapPositionDeleteIndex
. I may need to go back and check with fresh eyes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched to accepting BitmapPositionDeleteIndex
here and moved casts to the utility class.
c497048
to
5e0392b
Compare
long start = System.currentTimeMillis(); | ||
V value = valueSupplier.get(); | ||
long end = System.currentTimeMillis(); | ||
LOG.info("Loaded value for {} with size {} in {} ms", key, valueSize, (end - start)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going back and forth on the log level here. I'd say it is a fragile place and it is better to always have more logs for now. I don't expect a huge number of these lines. We do have pretty detailed logs for broadcasts in Spark, for instance.
That said, we can switch to debug if everyone thinks it would be better.
f0615aa
to
d743936
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left one comment, but rest looks good to me now. Thanks for the changes
long entrySize = OBJECT_HEADER + defaultSize(map.keyType()) + defaultSize(map.valueType()); | ||
return OBJECT_HEADER + 5 * entrySize; | ||
default: | ||
return 16; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see in a lot of places in code (SparkValueConverters, ExpressionUtil), if we dont match the type, we throw UnsupportedOperationException, I just felt its better to realize we miss a type here than give a random estimate and never know to fix it?
d743936
to
d5d7f79
Compare
d5d7f79
to
16486d3
Compare
I gave this PR a round of testing on the cluster and it seems to work as expected. |
Thanks for reviewing, @szehon-ho @RussellSpitzer! |
…he#9603) This change backports PR apache#8755 and PR apache#9583 to Spark 3.4.
This PR has code to parallelize reading of deletes and enable caching them on executors.
I also have a follow-up change to assign tasks for one partition to the same executor, similar to
KafkaRDD
. There is no way to express task affinity so we can only rely on task locality. The solution inKafkaRDD
is simple to implement but won't work well if dynamic allocation is enabled (so it should be hidden under a flag).More thoughts in this doc.