Skip to content

Commit 324a904

Browse files
committed
[SPARK-13747][CORE] Add ThreadUtils.awaitReady and disallow Await.ready
## What changes were proposed in this pull request? Add `ThreadUtils.awaitReady` similar to `ThreadUtils.awaitResult` and disallow `Await.ready`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes apache#17763 from zsxwing/awaitready.
1 parent f8e0f0f commit 324a904

File tree

11 files changed

+49
-18
lines changed

11 files changed

+49
-18
lines changed

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S
261261

262262
private def getImpl(timeout: Duration): T = {
263263
// This will throw TimeoutException on timeout:
264-
Await.ready(futureAction, timeout)
264+
ThreadUtils.awaitReady(futureAction, timeout)
265265
futureAction.value.get match {
266266
case scala.util.Success(value) => converter(value)
267267
case scala.util.Failure(exception) =>

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -618,12 +618,7 @@ class DAGScheduler(
618618
properties: Properties): Unit = {
619619
val start = System.nanoTime
620620
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
621-
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
622-
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
623-
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
624-
// safe to pass in null here. For more detail, see SPARK-13747.
625-
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
626-
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
621+
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
627622
waiter.completionFuture.value.get match {
628623
case scala.util.Success(_) =>
629624
logInfo("Job %d finished: %s, took %f s".format

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.channels.Channels
2323

2424
import scala.collection.mutable
2525
import scala.collection.mutable.HashMap
26-
import scala.concurrent.{Await, ExecutionContext, Future}
26+
import scala.concurrent.{ExecutionContext, Future}
2727
import scala.concurrent.duration._
2828
import scala.reflect.ClassTag
2929
import scala.util.Random
@@ -334,7 +334,7 @@ private[spark] class BlockManager(
334334
val task = asyncReregisterTask
335335
if (task != null) {
336336
try {
337-
Await.ready(task, Duration.Inf)
337+
ThreadUtils.awaitReady(task, Duration.Inf)
338338
} catch {
339339
case NonFatal(t) =>
340340
throw new Exception("Error occurred while waiting for async. reregistration", t)
@@ -916,7 +916,7 @@ private[spark] class BlockManager(
916916
if (level.replication > 1) {
917917
// Wait for asynchronous replication to finish
918918
try {
919-
Await.ready(replicationFuture, Duration.Inf)
919+
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
920920
} catch {
921921
case NonFatal(t) =>
922922
throw new Exception("Error occurred while waiting for replication to finish", t)

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,4 +206,25 @@ private[spark] object ThreadUtils {
206206
}
207207
}
208208
// scalastyle:on awaitresult
209+
210+
// scalastyle:off awaitready
211+
/**
212+
* Preferred alternative to `Await.ready()`.
213+
*
214+
* @see [[awaitResult]]
215+
*/
216+
@throws(classOf[SparkException])
217+
def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
218+
try {
219+
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
220+
// See SPARK-13747.
221+
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
222+
awaitable.ready(atMost)(awaitPermission)
223+
} catch {
224+
// TimeoutException is thrown in the current thread, so not need to warp the exception.
225+
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
226+
throw new SparkException("Exception thrown in awaitResult: ", t)
227+
}
228+
}
229+
// scalastyle:on awaitready
209230
}

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
2323
import java.util.concurrent.TimeUnit
2424

2525
import scala.concurrent.duration._
26-
import scala.concurrent.Await
2726

2827
import com.google.common.io.Files
2928
import org.apache.hadoop.conf.Configuration
@@ -35,7 +34,7 @@ import org.scalatest.concurrent.Eventually
3534
import org.scalatest.Matchers._
3635

3736
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
38-
import org.apache.spark.util.Utils
37+
import org.apache.spark.util.{ThreadUtils, Utils}
3938

4039

4140
class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
@@ -315,7 +314,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
315314
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
316315
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
317316
sc.cancelJobGroup("nonExistGroupId")
318-
Await.ready(future, Duration(2, TimeUnit.SECONDS))
317+
ThreadUtils.awaitReady(future, Duration(2, TimeUnit.SECONDS))
319318

320319
// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
321320
// SparkContext to shutdown, so the following assertion will fail.

core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.nio._
2222
import java.nio.charset.StandardCharsets
2323
import java.util.concurrent.TimeUnit
2424

25-
import scala.concurrent.{Await, Promise}
25+
import scala.concurrent.Promise
2626
import scala.concurrent.duration._
2727
import scala.util.{Failure, Success, Try}
2828

@@ -36,6 +36,7 @@ import org.apache.spark.network.{BlockDataManager, BlockTransferService}
3636
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
3737
import org.apache.spark.network.shuffle.BlockFetchingListener
3838
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
39+
import org.apache.spark.util.ThreadUtils
3940

4041
class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with ShouldMatchers {
4142
test("security default off") {
@@ -166,7 +167,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
166167
}
167168
})
168169

169-
Await.ready(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
170+
ThreadUtils.awaitReady(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
170171
promise.future.value.get
171172
}
172173
}

core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.concurrent.{TimeoutException, TimeUnit}
2121
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
2222

2323
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
24-
import scala.concurrent.{Await, Future}
24+
import scala.concurrent.Future
2525
import scala.concurrent.duration.{Duration, SECONDS}
2626
import scala.language.existentials
2727
import scala.reflect.ClassTag
@@ -260,7 +260,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
260260
*/
261261
def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
262262
try {
263-
Await.ready(jobFuture, duration)
263+
ThreadUtils.awaitReady(jobFuture, duration)
264264
} catch {
265265
case te: TimeoutException if backendException.get() != null =>
266266
val msg = raw"""

core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
152152
// one should acquire the write lock. The second thread should block until the winner of the
153153
// write race releases its lock.
154154
val winningFuture: Future[Boolean] =
155-
Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
155+
ThreadUtils.awaitReady(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
156156
assert(winningFuture.value.get.get)
157157
val winningTID = blockInfoManager.get("block").get.writerTask
158158
assert(winningTID === 1 || winningTID === 2)

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase
140140
ExecutionContext.global)
141141

142142
intercept[TimeoutException] {
143+
// scalastyle:off awaitready
143144
Await.ready(f, 50 millis)
145+
// scalastyle:on awaitready
144146
}
145147

146148
clock.advance(checkpointInterval.milliseconds / 2)

scalastyle-config.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,17 @@ This file is divided into 3 sections:
203203
]]></customMessage>
204204
</check>
205205

206+
<check customId="awaitready" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
207+
<parameters><parameter name="regex">Await\.ready</parameter></parameters>
208+
<customMessage><![CDATA[
209+
Are you sure that you want to use Await.ready? In most cases, you should use ThreadUtils.awaitReady instead.
210+
If you must use Await.ready, wrap the code block with
211+
// scalastyle:off awaitready
212+
Await.ready(...)
213+
// scalastyle:on awaitready
214+
]]></customMessage>
215+
</check>
216+
206217
<!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters -->
207218
<check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
208219
<parameters><parameter name="regex">JavaConversions</parameter></parameters>

0 commit comments

Comments
 (0)