Skip to content

Conversation

@pan3793
Copy link
Member

@pan3793 pan3793 commented Sep 18, 2025

What changes were proposed in this pull request?

Core ideas:

  1. Change the log replay thread pool to have a bounded queue, and block task submission when the queue is full.

    Currently, the log replay thread pool uses an unbounded queue, when there are a large number (e.g., millions) of event logs under spark.history.fs.logDirectory, all tasks will be queued at the thread pool queue without blocking the scanning thread, and in the next schedule, enqueue again ...

    https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size

  2. Move log compaction to a dedicated thread pool.

    Replaying and compaction are different types of workloads, isolating them from each other could improve the resource utilization.

Why are the changes needed?

Improve performance and reduce memory usage on the SHS bootstrap with empty KV cache, when there are tons of event logs.

Does this PR introduce any user-facing change?

No functionality changes, but brings a new config spark.history.fs.numCompactThreads

How was this patch tested?

Tested on an internal cluster, starting SHS with an empty spark.history.store.path and ~650k event logs under spark.history.fs.logDirectory, the related configs are

spark.history.fs.cleaner.maxNum 650000
spark.history.fs.logDirectory hdfs://foo/spark2-history
spark.history.fs.update.interval 5s
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.store.maxDiskUsage 100GB
spark.history.store.path /foo/bar/historyStore
spark.history.fs.numReplayThreads 64
spark.history.fs.numCompactThreads 4
spark.history.store.hybridStore.enabled true
spark.history.store.hybridStore.maxMemoryUsage 16g
spark.history.store.hybridStore.diskBackend ROCKSDB
  • spark.history.store.path is configured to an HDD path
  • we disable spark.eventLog.rolling.enabled so numCompactThreads has no heavy work

It's much faster than before, and metrics show better CPU utilization and lower memory usage.

bf51f797a11527ce82036669f96cf50b (before vs. after, the 3rd figure is "memory used")

Was this patch authored or co-authored using generative AI tooling?

No.

val notStale = mutable.HashSet[String]()
val updated = Option(fs.listStatus(new Path(logDir)))
.map(_.toImmutableArraySeq).getOrElse(Nil)
.map(_.toImmutableArraySeq).getOrElse(Seq.empty)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDEA can not highlight without this change.

val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I escalated this log level to info, it's not noisy and is relatively important for the user to understand the scanning frequency

// triggering another task for compaction task only if it succeeds
if (succeeded) {
submitLogProcessTask(rootPath) { () => compact(reader) }
submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) }
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually a MUST change. Otherwise, submitting a task to the self pool will cause a deadlock when the queue is full.

if (!Utils.isTesting) {
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
BlockingThreadPoolExecutorService.newInstance(
numReplayThreads, 1024, 60L, TimeUnit.SECONDS, "log-replay-executor")
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1024 - queue size, since listing logDir may take some time, it's better to allow some tasks to be queued before starting the next scanning round
60s - core thread idle time

I don't think it's necessary to expose these configs to users, but I'm open to different ideas.


if (updated.nonEmpty) {
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}")
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ${updated.size} is useful, but the ${updated.map(_.rootPath)} is dangerous, it produces a huge string that may cause OOM.

@pan3793
Copy link
Member Author

pan3793 commented Sep 18, 2025

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, SafeModeAction}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.security.AccessControlException
import org.apache.hadoop.util.BlockingThreadPoolExecutorService
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class is marked as @Private, but the API has been stable since 2016, so it should be low risk to use directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not sure this is a good decision.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea and implementation looks good to me, but I share the concerns of @dongjoon-hyun .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @peter-toth thanks for the review, I understand your concerns, let me explain my thoughts.

my initial idea is to implement such a policy

val BLOCK_PLOCY = new RejectedExecutionHandler() {
  override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
    executor.getQueue.put(r)
  }
}

but after searching on Stack Overflow, this seems to be trickier than I thought

https://stackoverflow.com/questions/3446011/threadpoolexecutor-block-when-its-queue-is-full/3518588#3518588

https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size

Given that Hadoop already has a solid (I suppose so) and stable implementation, I eventually chose to use it directly. In the worst cases, for example, Hadoop breaks the API, or Spark decides to cut off the Hadoop deps in the future (both should be rare to happen), we can fork the code from Hadoop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the reasoning. However, I disagree with the point of cutting off the Hadoop deps. If that's tricky now, it will be the same in the end.

Without moving forward, we need to check the following.

  1. Can we borrow Private-scope code in Spark side stably without increasing further maintenance burden?
  2. Can we ask Apache Hadoop community to expose that class officially? This will remove our forked code at that time.

Historically, we spent lots of efforts to migrate to Hadoop Client to avoid this kind of dependencies issue. I don't think both the Apache Spark and Apache Hadoop community wants to have a kind of hard dependencies again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun @peter-toth The Hadoop code comes from Apache S4, I wrote a simple "BlockingThreadPoolExecutorService" based on it, so we don't have to worry about using Hadoop private API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I also verified the updated code on the prod env, and it has the same effect as the first version.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to avoid introducing the usage of private-scoped method.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @pan3793 and @peter-toth .

Merged to master for Apache Spark 4.1.0-preview2 (next week).

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-53631][CORE][SHS] Optimize memory and perf on SHS bootstrap [SPARK-53631][CORE] Optimize memory and perf on SHS bootstrap Sep 24, 2025
dongjoon-hyun added a commit that referenced this pull request Oct 16, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- #47856
- #51130
- #51163
- #51604
- #51630
- #51708
- #51885
- #52091
- #52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

Core ideas:

1. Change the log replay thread pool to have a bounded queue, and block task submission when the queue is full.

   Currently, the log replay thread pool uses an unbounded queue, when there are a large number (e.g., millions) of event logs under `spark.history.fs.logDirectory`, all tasks will be queued at the thread pool queue without blocking the scanning thread, and in the next schedule, enqueue again ...

   https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size

2. Move log compaction to a dedicated thread pool.

   Replaying and compaction are different types of workloads, isolating them from each other could improve the resource utilization.

### Why are the changes needed?

Improve performance and reduce memory usage on the SHS bootstrap with empty KV cache, when there are tons of event logs.

### Does this PR introduce _any_ user-facing change?

No functionality changes, but brings a new config `spark.history.fs.numCompactThreads`

### How was this patch tested?

Tested on an internal cluster, starting SHS with an empty `spark.history.store.path` and ~650k event logs under `spark.history.fs.logDirectory`, the related configs are

```
spark.history.fs.cleaner.maxNum 650000
spark.history.fs.logDirectory hdfs://foo/spark2-history
spark.history.fs.update.interval 5s
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider
spark.history.store.maxDiskUsage 100GB
spark.history.store.path /foo/bar/historyStore
spark.history.fs.numReplayThreads 64
spark.history.fs.numCompactThreads 4
spark.history.store.hybridStore.enabled true
spark.history.store.hybridStore.maxMemoryUsage 16g
spark.history.store.hybridStore.diskBackend ROCKSDB
```
- `spark.history.store.path` is configured to an HDD path
- we disable `spark.eventLog.rolling.enabled` so `numCompactThreads` has no heavy work

It's much faster than before, and metrics show better CPU utilization and lower memory usage.

<img width="2546" height="480" alt="bf51f797a11527ce82036669f96cf50b" src="https://github.com/user-attachments/assets/4db521b0-cf1c-4b93-a06d-27fdaf1ccec4" />
(before vs. after, the 3rd figure is "memory used")

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52382 from pan3793/SPARK-53631.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation.

### Why are the changes needed?

To help the users use new features easily.

- apache#47856
- apache#51130
- apache#51163
- apache#51604
- apache#51630
- apache#51708
- apache#51885
- apache#52091
- apache#52382

### Does this PR introduce _any_ user-facing change?

No behavior change because this is a documentation update.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52626 from dongjoon-hyun/SPARK-53926.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants