-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53631][CORE] Optimize memory and perf on SHS bootstrap #52382
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3d696a5 to
7ba1bd0
Compare
| val notStale = mutable.HashSet[String]() | ||
| val updated = Option(fs.listStatus(new Path(logDir))) | ||
| .map(_.toImmutableArraySeq).getOrElse(Nil) | ||
| .map(_.toImmutableArraySeq).getOrElse(Seq.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDEA can not highlight without this change.
| val newLastScanTime = clock.getTimeMillis() | ||
| logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") | ||
| logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " + | ||
| log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I escalated this log level to info, it's not noisy and is relatively important for the user to understand the scanning frequency
| // triggering another task for compaction task only if it succeeds | ||
| if (succeeded) { | ||
| submitLogProcessTask(rootPath) { () => compact(reader) } | ||
| submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually a MUST change. Otherwise, submitting a task to the self pool will cause a deadlock when the queue is full.
| if (!Utils.isTesting) { | ||
| ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor") | ||
| BlockingThreadPoolExecutorService.newInstance( | ||
| numReplayThreads, 1024, 60L, TimeUnit.SECONDS, "log-replay-executor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1024 - queue size, since listing logDir may take some time, it's better to allow some tasks to be queued before starting the next scanning round
60s - core thread idle time
I don't think it's necessary to expose these configs to users, but I'm open to different ideas.
|
|
||
| if (updated.nonEmpty) { | ||
| logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") | ||
| logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the ${updated.size} is useful, but the ${updated.map(_.rootPath)} is dangerous, it produces a huge string that may cause OOM.
| import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, SafeModeAction} | ||
| import org.apache.hadoop.hdfs.DistributedFileSystem | ||
| import org.apache.hadoop.security.AccessControlException | ||
| import org.apache.hadoop.util.BlockingThreadPoolExecutorService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class is marked as @Private, but the API has been stable since 2016, so it should be low risk to use directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I'm not sure this is a good decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea and implementation looks good to me, but I share the concerns of @dongjoon-hyun .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun @peter-toth thanks for the review, I understand your concerns, let me explain my thoughts.
my initial idea is to implement such a policy
val BLOCK_PLOCY = new RejectedExecutionHandler() {
override def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor): Unit = {
executor.getQueue.put(r)
}
}but after searching on Stack Overflow, this seems to be trickier than I thought
Given that Hadoop already has a solid (I suppose so) and stable implementation, I eventually chose to use it directly. In the worst cases, for example, Hadoop breaks the API, or Spark decides to cut off the Hadoop deps in the future (both should be rare to happen), we can fork the code from Hadoop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the reasoning. However, I disagree with the point of cutting off the Hadoop deps. If that's tricky now, it will be the same in the end.
Without moving forward, we need to check the following.
- Can we borrow
Private-scope code in Spark side stably without increasing further maintenance burden? - Can we ask Apache Hadoop community to expose that class officially? This will remove our forked code at that time.
Historically, we spent lots of efforts to migrate to Hadoop Client to avoid this kind of dependencies issue. I don't think both the Apache Spark and Apache Hadoop community wants to have a kind of hard dependencies again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun @peter-toth The Hadoop code comes from Apache S4, I wrote a simple "BlockingThreadPoolExecutorService" based on it, so we don't have to worry about using Hadoop private API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I also verified the updated code on the prod env, and it has the same effect as the first version.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to avoid introducing the usage of private-scoped method.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @pan3793 and @peter-toth .
Merged to master for Apache Spark 4.1.0-preview2 (next week).
### What changes were proposed in this pull request? This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation. ### Why are the changes needed? To help the users use new features easily. - #47856 - #51130 - #51163 - #51604 - #51630 - #51708 - #51885 - #52091 - #52382 ### Does this PR introduce _any_ user-facing change? No behavior change because this is a documentation update. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52626 from dongjoon-hyun/SPARK-53926. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Core ideas: 1. Change the log replay thread pool to have a bounded queue, and block task submission when the queue is full. Currently, the log replay thread pool uses an unbounded queue, when there are a large number (e.g., millions) of event logs under `spark.history.fs.logDirectory`, all tasks will be queued at the thread pool queue without blocking the scanning thread, and in the next schedule, enqueue again ... https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size 2. Move log compaction to a dedicated thread pool. Replaying and compaction are different types of workloads, isolating them from each other could improve the resource utilization. ### Why are the changes needed? Improve performance and reduce memory usage on the SHS bootstrap with empty KV cache, when there are tons of event logs. ### Does this PR introduce _any_ user-facing change? No functionality changes, but brings a new config `spark.history.fs.numCompactThreads` ### How was this patch tested? Tested on an internal cluster, starting SHS with an empty `spark.history.store.path` and ~650k event logs under `spark.history.fs.logDirectory`, the related configs are ``` spark.history.fs.cleaner.maxNum 650000 spark.history.fs.logDirectory hdfs://foo/spark2-history spark.history.fs.update.interval 5s spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider spark.history.store.maxDiskUsage 100GB spark.history.store.path /foo/bar/historyStore spark.history.fs.numReplayThreads 64 spark.history.fs.numCompactThreads 4 spark.history.store.hybridStore.enabled true spark.history.store.hybridStore.maxMemoryUsage 16g spark.history.store.hybridStore.diskBackend ROCKSDB ``` - `spark.history.store.path` is configured to an HDD path - we disable `spark.eventLog.rolling.enabled` so `numCompactThreads` has no heavy work It's much faster than before, and metrics show better CPU utilization and lower memory usage. <img width="2546" height="480" alt="bf51f797a11527ce82036669f96cf50b" src="https://github.com/user-attachments/assets/4db521b0-cf1c-4b93-a06d-27fdaf1ccec4" /> (before vs. after, the 3rd figure is "memory used") ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52382 from pan3793/SPARK-53631. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? This PR aims to document newly added `core` module configurations as a part of Apache Spark 4.1.0 preparation. ### Why are the changes needed? To help the users use new features easily. - apache#47856 - apache#51130 - apache#51163 - apache#51604 - apache#51630 - apache#51708 - apache#51885 - apache#52091 - apache#52382 ### Does this PR introduce _any_ user-facing change? No behavior change because this is a documentation update. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52626 from dongjoon-hyun/SPARK-53926. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
What changes were proposed in this pull request?
Core ideas:
Change the log replay thread pool to have a bounded queue, and block task submission when the queue is full.
Currently, the log replay thread pool uses an unbounded queue, when there are a large number (e.g., millions) of event logs under
spark.history.fs.logDirectory, all tasks will be queued at the thread pool queue without blocking the scanning thread, and in the next schedule, enqueue again ...https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size
Move log compaction to a dedicated thread pool.
Replaying and compaction are different types of workloads, isolating them from each other could improve the resource utilization.
Why are the changes needed?
Improve performance and reduce memory usage on the SHS bootstrap with empty KV cache, when there are tons of event logs.
Does this PR introduce any user-facing change?
No functionality changes, but brings a new config
spark.history.fs.numCompactThreadsHow was this patch tested?
Tested on an internal cluster, starting SHS with an empty
spark.history.store.pathand ~650k event logs underspark.history.fs.logDirectory, the related configs arespark.history.store.pathis configured to an HDD pathspark.eventLog.rolling.enabledsonumCompactThreadshas no heavy workIt's much faster than before, and metrics show better CPU utilization and lower memory usage.
Was this patch authored or co-authored using generative AI tooling?
No.