Skip to content

Commit a2f745c

Browse files
committed
Remove sendMessageReliablySync; callers can wait themselves.
This makes the waiting more explicit.
1 parent c01c450 commit a2f745c

File tree

4 files changed

+17
-16
lines changed

4 files changed

+17
-16
lines changed

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import scala.collection.mutable.SynchronizedQueue
3535
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
3636
import scala.concurrent.duration._
3737
import scala.language.postfixOps
38-
import scala.util.Try
3938

4039
import org.apache.spark._
4140
import org.apache.spark.util.{SystemClock, Utils}
@@ -849,11 +848,6 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
849848
promise.future
850849
}
851850

852-
def sendMessageReliablySync(connectionManagerId: ConnectionManagerId,
853-
message: Message): Try[Message] = {
854-
Try(Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf))
855-
}
856-
857851
def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) {
858852
onReceiveCallback = callback
859853
}
@@ -911,7 +905,7 @@ private[spark] object ConnectionManager {
911905

912906
(0 until count).map(i => {
913907
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
914-
manager.sendMessageReliablySync(manager.id, bufferMessage)
908+
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), Duration.Inf)
915909
})
916910
println("--------------------------")
917911
println()

core/src/main/scala/org/apache/spark/network/SenderTest.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ package org.apache.spark.network
2020
import java.nio.ByteBuffer
2121
import org.apache.spark.{SecurityManager, SparkConf}
2222

23+
import scala.concurrent.Await
24+
import scala.concurrent.duration.Duration
25+
import scala.util.Try
26+
2327
private[spark] object SenderTest {
2428
def main(args: Array[String]) {
2529

@@ -51,7 +55,8 @@ private[spark] object SenderTest {
5155
val dataMessage = Message.createBufferMessage(buffer.duplicate)
5256
val startTime = System.currentTimeMillis
5357
/* println("Started timer at " + startTime) */
54-
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
58+
val promise = manager.sendMessageReliably(targetConnectionManagerId, dataMessage)
59+
val responseStr: String = Try(Await.result(promise, Duration.Inf))
5560
.map { response =>
5661
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
5762
new String(buffer.array, "utf-8")

core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ import org.apache.spark.Logging
2323
import org.apache.spark.network._
2424
import org.apache.spark.util.Utils
2525

26-
import scala.util.{Failure, Success}
26+
import scala.concurrent.Await
27+
import scala.concurrent.duration.Duration
28+
import scala.util.{Try, Failure, Success}
2729

2830
/**
2931
* A network interface for BlockManager. Each slave should have one
@@ -117,8 +119,8 @@ private[spark] object BlockManagerWorker extends Logging {
117119
val connectionManager = blockManager.connectionManager
118120
val blockMessage = BlockMessage.fromPutBlock(msg)
119121
val blockMessageArray = new BlockMessageArray(blockMessage)
120-
val resultMessage = connectionManager.sendMessageReliablySync(
121-
toConnManagerId, blockMessageArray.toBufferMessage)
122+
val resultMessage = Try(Await.result(connectionManager.sendMessageReliably(
123+
toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
122124
resultMessage.isSuccess
123125
}
124126

@@ -127,8 +129,8 @@ private[spark] object BlockManagerWorker extends Logging {
127129
val connectionManager = blockManager.connectionManager
128130
val blockMessage = BlockMessage.fromGetBlock(msg)
129131
val blockMessageArray = new BlockMessageArray(blockMessage)
130-
val responseMessage = connectionManager.sendMessageReliablySync(
131-
toConnManagerId, blockMessageArray.toBufferMessage)
132+
val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(
133+
toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
132134
responseMessage match {
133135
case Success(message) => {
134136
val bufferMessage = message.asInstanceOf[BufferMessage]

core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class ConnectionManagerSuite extends FunSuite {
4747
buffer.flip
4848

4949
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
50-
manager.sendMessageReliablySync(manager.id, bufferMessage)
50+
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
5151

5252
assert(receivedMessage == true)
5353

@@ -80,7 +80,7 @@ class ConnectionManagerSuite extends FunSuite {
8080

8181
(0 until count).map(i => {
8282
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
83-
manager.sendMessageReliablySync(managerServer.id, bufferMessage)
83+
Await.ready(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
8484
})
8585

8686
assert(numReceivedServerMessages == 10)
@@ -119,7 +119,7 @@ class ConnectionManagerSuite extends FunSuite {
119119
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
120120
buffer.flip
121121
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
122-
manager.sendMessageReliablySync(managerServer.id, bufferMessage)
122+
Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
123123

124124
assert(numReceivedServerMessages == 0)
125125
assert(numReceivedMessages == 0)

0 commit comments

Comments
 (0)