Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public enum LogKeys implements LogKey {
LAST_ACCESS_TIME,
LAST_COMMITTED_CHECKPOINT_ID,
LAST_COMMIT_BASED_CHECKPOINT_ID,
LAST_SCAN_TIME,
LAST_VALID_TIME,
LATEST_BATCH_ID,
LATEST_COMMITTED_BATCH_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)

// Number of threads used to replay event logs.
private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS)
// Number of threads used to compact rolling event logs.
private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS)

private val logDir = conf.get(History.HISTORY_LOG_DIR)

Expand Down Expand Up @@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private val replayExecutor: ExecutorService = {
if (!Utils.isTesting) {
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
numReplayThreads, 1024, "log-replay-executor")
} else {
ThreadUtils.sameThreadExecutorService()
}
}

/**
* Fixed size thread pool to compact log files.
*/
private val compactExecutor: ExecutorService = {
if (!Utils.isTesting) {
ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
numCompactThreads, 1024, "log-compact-executor")
} else {
ThreadUtils.sameThreadExecutorService()
}
Expand Down Expand Up @@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
initThread.interrupt()
initThread.join()
}
Seq(pool, replayExecutor).foreach { executor =>
Seq(pool, replayExecutor, compactExecutor).foreach { executor =>
executor.shutdown()
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow()
Expand Down Expand Up @@ -487,15 +502,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
var count: Int = 0
try {
val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I escalated this log level to info, it's not noisy and is relatively important for the user to understand the scanning frequency


// Mark entries that are processing as not stale. Such entries do not have a chance to be
// updated with the new 'lastProcessed' time and thus any entity that completes processing
// right after this check and before the check for stale entities will be identified as stale
// and will be deleted from the UI until the next 'checkForLogs' run.
val notStale = mutable.HashSet[String]()
val updated = Option(fs.listStatus(new Path(logDir)))
.map(_.toImmutableArraySeq).getOrElse(Nil)
.map(_.toImmutableArraySeq).getOrElse(Seq.empty)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDEA can not highlight without this change.

.filter { entry => isAccessible(entry.getPath) }
.filter { entry =>
if (isProcessing(entry.getPath)) {
Expand Down Expand Up @@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

if (updated.nonEmpty) {
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}")
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ${updated.size} is useful, but the ${updated.map(_.rootPath)} is dangerous, it produces a huge string that may cause OOM.

}

updated.foreach { entry =>
submitLogProcessTask(entry.rootPath) { () =>
submitLogProcessTask(entry.rootPath, replayExecutor) { () =>
mergeApplicationListing(entry, newLastScanTime, true)
}
}
Expand Down Expand Up @@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// triggering another task for compaction task only if it succeeds
if (succeeded) {
submitLogProcessTask(rootPath) { () => compact(reader) }
submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) }
Copy link
Member Author

@pan3793 pan3793 Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually a MUST change. Otherwise, submitting a task to the self pool will cause a deadlock when the queue is full.

}
}
}
Expand Down Expand Up @@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
private def submitLogProcessTask(
rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = {
try {
processing(rootPath)
replayExecutor.submit(task)
pool.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// pool.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ private[spark] object History {
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads")
.version("4.1.0")
.doc("Number of threads that will be used by history server to compact event logs.")
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
.version("1.0.0")
.doc("The number of applications to retain UI data for in the cache. If this cap is " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.util
import java.util.concurrent._

import com.google.common.util.concurrent.Futures

// scalastyle:off
/**
* This thread pool executor throttles the submission of new tasks by using a semaphore.
* Task submissions require permits, task completions release permits.
* <p>
* NOTE: [[invoke*]] methods are not supported, you should either use the [[submit]] methods
* or the [[execute]] method.
* <p>
* This is inspired by
* <a href="https://github.com/apache/incubator-retired-s4/blob/0.6.0-Final/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
* Apache S4 BlockingThreadPoolExecutorService</a>
*/
// scalastyle:on
private[spark] class BlockingThreadPoolExecutorService(
nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory)
extends ExecutorService {

private val permits = new Semaphore(nThreads + workQueueSize)

private val workQuque = new LinkedBlockingQueue[Runnable](nThreads + workQueueSize)

private val delegate = new ThreadPoolExecutor(
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory)

override def shutdown(): Unit = delegate.shutdown()

override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()

override def isShutdown: Boolean = delegate.isShutdown

override def isTerminated: Boolean = delegate.isTerminated

override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean =
delegate.awaitTermination(timeout, unit)

override def submit[T](task: Callable[T]): Future[T] = {
try permits.acquire() catch {
case e: InterruptedException =>
Thread.currentThread.interrupt()
return Futures.immediateFailedFuture(e)
}
delegate.submit(new CallableWithPermitRelease(task))
}

override def submit[T](task: Runnable, result: T): Future[T] = {
try permits.acquire() catch {
case e: InterruptedException =>
Thread.currentThread.interrupt()
return Futures.immediateFailedFuture(e)
}
delegate.submit(new RunnableWithPermitRelease(task), result)
}

override def submit(task: Runnable): Future[_] = {
try permits.acquire() catch {
case e: InterruptedException =>
Thread.currentThread.interrupt()
return Futures.immediateFailedFuture(e)
}
delegate.submit(new RunnableWithPermitRelease(task))
}

override def execute(command: Runnable): Unit = {
try permits.acquire() catch {
case _: InterruptedException =>
Thread.currentThread.interrupt()
}
delegate.execute(new RunnableWithPermitRelease(command))
}

override def invokeAll[T](
tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] =
throw new UnsupportedOperationException("Not implemented")

override def invokeAll[T](
tasks: util.Collection[_ <: Callable[T]],
timeout: Long, unit: TimeUnit): util.List[Future[T]] =
throw new UnsupportedOperationException("Not implemented")

override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T =
throw new UnsupportedOperationException("Not implemented")

override def invokeAny[T](
tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T =
throw new UnsupportedOperationException("Not implemented")

/**
* Releases a permit after the task is executed.
*/
private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable {
override def run(): Unit = try delegate.run() finally permits.release()
}

/**
* Releases a permit after the task is completed.
*/
private class CallableWithPermitRelease[T](delegate: Callable[T]) extends Callable[T] {
override def call(): T = try delegate.call() finally permits.release()
}
}
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ private[spark] object ThreadUtils {
rejectedExecutionHandler)
}

/**
* Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task submission will
* be blocked when queue is full.
*
* @param nThreads the number of threads in the pool
* @param workQueueSize the capacity of the queue to use for holding tasks before they are
* executed. Task submission will be blocked when queue is full.
* @param prefix thread names are formatted as prefix-ID, where ID is a unique, sequentially
* assigned integer.
* @return BlockingThreadPoolExecutorService
*/
def newDaemonBlockingThreadPoolExecutorService(
nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = {
val threadFactory = namedThreadFactory(prefix)
new BlockingThreadPoolExecutorService(nThreads, workQueueSize, threadFactory)
}

/**
* Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.
*/
Expand Down
34 changes: 34 additions & 0 deletions core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}

test("newDaemonBlockingThreadPoolExecutorService") {
val nThread = 3
val workQueueSize = 5
val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1)
val latch = new CountDownLatch(1)
val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
nThread, workQueueSize, "ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService")

try {
val submitThread = new Thread(() => {
(0 until nThread + workQueueSize + 1).foreach { i =>
blockingPool.execute(() => {
latch.await(10, TimeUnit.SECONDS)
})
submithreadsLatch.countDown()
}
})
submitThread.setDaemon(true)
submitThread.start()

// the last one task submission will be blocked until previous tasks completed
eventually(timeout(10.seconds)) {
assert(submithreadsLatch.getCount === 1L)
}
latch.countDown()
eventually(timeout(10.seconds)) {
assert(submithreadsLatch.getCount === 0L)
assert(!submitThread.isAlive)
}
} finally {
blockingPool.shutdownNow()
}
}

test("sameThread") {
val callerThreadName = Thread.currentThread().getName()
val f = Future {
Expand Down
8 changes: 8 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ Security options for the Spark History Server are covered more detail in the
</td>
<td>2.0.0</td>
</tr>
<tr>
<td>spark.history.fs.numCompactThreads</td>
<td>25% of available cores</td>
<td>
Number of threads that will be used by history server to compact event logs.
</td>
<td>4.1.0</td>
</tr>
<tr>
<td>spark.history.store.maxDiskUsage</td>
<td>10g</td>
Expand Down