Skip to content

Commit 0e318ac

Browse files
yogeshgjiangxb1987
authored andcommitted
[SPARK-25901][CORE] Use only one thread in BarrierTaskContext companion object
## What changes were proposed in this pull request? Now we use only one `timer` (and thus a backing thread) in `BarrierTaskContext` companion object, and the objects can add `timerTasks` to that `timer`. ## How was this patch tested? This was tested manually by generating logs and seeing that they look the same as ones before, namely, that is, a partition waiting on another partition for 5seconds generates 4-5 log messages when the frequency of logging is set to 1second. Closes #22912 from yogeshg/thread. Authored-by: Yogesh Garg <1059168+yogeshg@users.noreply.github.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
1 parent ed0c57e commit 0e318ac

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

core/src/main/scala/org/apache/spark/BarrierTaskContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ import org.apache.spark.util._
4141
class BarrierTaskContext private[spark] (
4242
taskContext: TaskContext) extends TaskContext with Logging {
4343

44+
import BarrierTaskContext._
45+
4446
// Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls.
4547
private val barrierCoordinator: RpcEndpointRef = {
4648
val env = SparkEnv.get
4749
RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv)
4850
}
4951

50-
private val timer = new Timer("Barrier task timer for barrier() calls.")
51-
5252
// Local barrierEpoch that identify a barrier() call from current task, it shall be identical
5353
// with the driver side epoch.
5454
private var barrierEpoch = 0
@@ -234,4 +234,7 @@ object BarrierTaskContext {
234234
@Experimental
235235
@Since("2.4.0")
236236
def get(): BarrierTaskContext = TaskContext.get().asInstanceOf[BarrierTaskContext]
237+
238+
private val timer = new Timer("Barrier task timer for barrier() calls.")
239+
237240
}

0 commit comments

Comments
 (0)