Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, Core, Spark 3.5: Parallelize reading of deletes and cache them on executors #8755

Merged
merged 5 commits into from
Jan 16, 2024

Conversation

aokolnychyi
Copy link
Contributor

@aokolnychyi aokolnychyi commented Oct 9, 2023

This PR has code to parallelize reading of deletes and enable caching them on executors.

I also have a follow-up change to assign tasks for one partition to the same executor, similar to KafkaRDD. There is no way to express task affinity so we can only rely on task locality. The solution in KafkaRDD is simple to implement but won't work well if dynamic allocation is enabled (so it should be hidden under a flag).

More thoughts in this doc.

new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
4 * Runtime.getRuntime().availableProcessors(),
Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value may sound ridiculous but here is my thought process: there is one such thread pool per JVM, each core in an executor can get a data task that may need to load 1 to many delete files, these tasks are I/O intensive. This value essentially means we can try to load 4 delete files concurrently per each data task. The cache is also blocking to prevent reading the same files twice.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is really no good way to pick the default for this since
Thread_pool_size = fn(cores, io_wait_time/compute_time), and your guess is as good as anyone else's whether 4 is a good number for the environment the code is going to run on.
Assuming io_wait time/compute_time == 5, a factor of 4 above would give you a utilization of 80% which sounds pretty good.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this end user configurable? If not then it probably needs to be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is configurable, we just need to make sure the default value is reasonable.

@@ -236,6 +236,8 @@ private TableProperties() {}
public static final String DELETE_PLANNING_MODE = "read.delete-planning-mode";
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName();

public static final String SPARK_EXECUTOR_CACHE_ENABLED = "read.spark.executor-cache.enabled";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to discuss how to enable/disable and configure this cache.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passed in as a hadoop conf property, or a catalog property since we want this to be end user configurable? So probably not as part of table properties?

@@ -125,6 +126,25 @@ public static StructLikeSet toEqualitySet(
}
}

public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> toPositionIndexes(
Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike toPositionIndex below, this one does not filter the deletes for a particular data file. Instead, it builds an index for each referenced data file and returns a map. It useful when the entire delete file can be cached.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems useful as a javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

* @return an {@link ExecutorService} that uses the delete worker pool
* @see SystemConfigs#DELETE_WORKER_THREAD_POOL_SIZE
*/
public static ExecutorService getDeleteWorkerPool() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't usually use get but we have getWorkerPool() above so I matched it for consistency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have to. And maybe will need to pass in the configured thread pool size as an argument.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseDeleteLoader implements DeleteLoader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this abstraction cause it was too much to put into DeleteFilter. Take a look at the parent interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I questioned this a bit and do believe it is better to have this logic separately from DeleteFilter. That said, the exact API and whether it should be a top-level class are still open questions.

return createDeleteIterable(records, isDeleted);
}

return hasIsDeletedColumn
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate discussion but I think we should drop streaming position deletes. We originally added them when we used a set, not bitmaps.

import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.util.StructLikeSet;

public interface DeleteLoader {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See here.

@singhpk234
Copy link
Contributor

singhpk234 commented Oct 10, 2023

Should we apply some intelligence on how we are distributing the tasks so that we could utilize the max from the executor cache ? For ex : lets say we could prefer sending those set of data files which have a lot of overlapping delete files or may be belong to some partition (for ex : position deletes) ?

@aokolnychyi
Copy link
Contributor Author

aokolnychyi commented Oct 10, 2023

Should we apply some intelligence on how we are distributing the tasks so that we could utilize the max from the executor cache ? For ex : lets say we could prefer sending those set of data files which have a lot of overlapping delete files or may be belong to some partition (for ex : position deletes) ?

@singhpk234, I have a follow-up change to do that. Unfortunately, it is a bit controversial. There is no way to express task affinity in Spark, only locality. The best option for us is to implement what KafkaRDD does. The problem is that it only works well if dynamic allocation is disabled. Even without that, this feature should be useful. The change to assign tasks for the same partition to one executor is around 20 lines of code.

@singhpk234
Copy link
Contributor

Thanks @aokolnychyi looking forward to it :) !

@aokolnychyi
Copy link
Contributor Author

I tested this PR on a cluster a bit. It would be nice if someone could also play around with it in their environment.

new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
4 * Runtime.getRuntime().availableProcessors(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is really no good way to pick the default for this since
Thread_pool_size = fn(cores, io_wait_time/compute_time), and your guess is as good as anyone else's whether 4 is a good number for the environment the code is going to run on.
Assuming io_wait time/compute_time == 5, a factor of 4 above would give you a utilization of 80% which sounds pretty good.

@@ -236,6 +236,8 @@ private TableProperties() {}
public static final String DELETE_PLANNING_MODE = "read.delete-planning-mode";
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName();

public static final String SPARK_EXECUTOR_CACHE_ENABLED = "read.spark.executor-cache.enabled";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passed in as a hadoop conf property, or a catalog property since we want this to be end user configurable? So probably not as part of table properties?

new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
4 * Runtime.getRuntime().availableProcessors(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this end user configurable? If not then it probably needs to be.

public static final int DELETE_WORKER_THREAD_POOL_SIZE =
SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value();

private static final ExecutorService DELETE_WORKER_POOL =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the size of the thread pool is end user configurable this will not work. But you could initialize the thread pool lazily in getDeleteWorkerPool() and presumably there will be some way to read the end user configured value at that point.

* @return an {@link ExecutorService} that uses the delete worker pool
* @see SystemConfigs#DELETE_WORKER_THREAD_POOL_SIZE
*/
public static ExecutorService getDeleteWorkerPool() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might have to. And maybe will need to pass in the configured thread pool size as an argument.

@aokolnychyi aokolnychyi force-pushed the executor-delete-cache branch from 5ec9097 to 30802cc Compare November 19, 2023 20:46
@aokolnychyi aokolnychyi changed the title [WIP] API, Core, Spark 3.5: Parallelize reading of deletes and cache them on executors API, Core, Spark 3.5: Parallelize reading of deletes and cache them on executors Nov 19, 2023
@@ -452,6 +454,59 @@ private static void checkSchemaCompatibility(
}
}

public static long defaultSize(Types.NestedField field) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have 2 options to limit the size of the cache:

  • Add ways to limit the size of equality and position delete files to be loaded.
  • Add ways to indicate the maximum cache size in bytes.

The first one is easy but we need to account that delete files may store extra columns and it will be hard to configure these options correctly cause users have no idea how much the cached representation will actually occupy. That's why I went with the estimation approach and let users configure the max cache size in bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to cache the extra columns? We wouldn't be using them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't cache them. Only equality columns. However, fileSizeInBytes includes the total size, not the projection.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to understand, user configures size of equality deletes (but based on whole row sizes), but position deletes based on 2* record count? Would it be easier to we consider having two configured cache sizes (position delete , eq delete cache sizes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to ask the user for the total size of the cache in memory and estimate the actual size to honor those values. For equality deletes, we will rely on the number of records and types of equality columns.

*/
public static final ConfigEntry<Integer> DELETE_WORKER_THREAD_POOL_SIZE =
new ConfigEntry<>(
"iceberg.worker.delete-num-threads",
"ICEBERG_WORKER_DELETE_NUM_THREADS",
Math.max(2, Runtime.getRuntime().availableProcessors()),
Math.max(2, 4 * Runtime.getRuntime().availableProcessors()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal but we are avoiding the RevCheck here by putting our multiplier in a constant here. We should probably move the 4 into a field so future modifications trigger the Rev checker

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that but is it something we want to expose to others? Would the goal be to bring the attention or to prohibit future modifications?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can keep it private, it's to prohibit future mods

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep it private, it won't break revapi. I am not sure about a public one. I'll check other places we have.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't do that for other properties as well.
I'd be open to explore that but for all properties in a separate PR.

@@ -27,6 +27,15 @@ class BitmapPositionDeleteIndex implements PositionDeleteIndex {
roaring64Bitmap = new Roaring64Bitmap();
}

void merge(PositionDeleteIndex other) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just only allow BitmapPositionDeleteIndex here? Do we not have the type when we call merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also EmptyPositionDeleteIndex. I actually started with BitmapPositionDeleteIndex. I may need to go back and check with fresh eyes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to accepting BitmapPositionDeleteIndex here and moved casts to the utility class.

@aokolnychyi aokolnychyi force-pushed the executor-delete-cache branch 6 times, most recently from c497048 to 5e0392b Compare January 9, 2024 21:51
long start = System.currentTimeMillis();
V value = valueSupplier.get();
long end = System.currentTimeMillis();
LOG.info("Loaded value for {} with size {} in {} ms", key, valueSize, (end - start));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going back and forth on the log level here. I'd say it is a fragile place and it is better to always have more logs for now. I don't expect a huge number of these lines. We do have pretty detailed logs for broadcasts in Spark, for instance.

That said, we can switch to debug if everyone thinks it would be better.

@aokolnychyi aokolnychyi force-pushed the executor-delete-cache branch 2 times, most recently from f0615aa to d743936 Compare January 10, 2024 09:25
Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left one comment, but rest looks good to me now. Thanks for the changes

long entrySize = OBJECT_HEADER + defaultSize(map.keyType()) + defaultSize(map.valueType());
return OBJECT_HEADER + 5 * entrySize;
default:
return 16;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in a lot of places in code (SparkValueConverters, ExpressionUtil), if we dont match the type, we throw UnsupportedOperationException, I just felt its better to realize we miss a type here than give a random estimate and never know to fix it?

@aokolnychyi aokolnychyi force-pushed the executor-delete-cache branch from d743936 to d5d7f79 Compare January 12, 2024 22:03
@aokolnychyi aokolnychyi force-pushed the executor-delete-cache branch from d5d7f79 to 16486d3 Compare January 14, 2024 23:10
@aokolnychyi aokolnychyi reopened this Jan 16, 2024
@aokolnychyi
Copy link
Contributor Author

I gave this PR a round of testing on the cluster and it seems to work as expected.

@aokolnychyi aokolnychyi merged commit 684f7a7 into apache:main Jan 16, 2024
@aokolnychyi
Copy link
Contributor Author

Thanks for reviewing, @szehon-ho @RussellSpitzer!

aokolnychyi added a commit that referenced this pull request Feb 2, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants