Skip to content

[SPARK-21701][CORE] Enable RPC client to use SO_RCVBUF, SO_SNDBUF and SO_BACKLOG in SparkConf #18922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ private TransportClient createClient(InetSocketAddress address)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);

if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really used for Client bootstrap? AFAIK backlog is mainly used in server side to cache the incoming connections.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your thoughts. I will eliminate the setting of the param.

}

if (conf.receiveBuf() > 0) {
bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}

if (conf.sendBuf() > 0) {
bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());
}

final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,52 @@
/**
* Encoder used by the server side to encode server-to-client responses.
* This encoder is stateless so it is safe to be shared by multiple threads.
* <p></p>
* The following is the wire format that RPC message used, it is a typical
* header+payload structure, while the header contains body length which will
* be an indication for {@link org.apache.spark.network.util.TransportFrameDecoder}
* to build a complete message out of byte array for downstream {@link ChannelHandler}
* to use.
* <p></p>
* The underlying network I/O handles {@link MessageWithHeader} to transport message
* between peers. Below shows how RPC message looks like. The header is
* {@link MessageWithHeader#header}
* and the body is {@link MessageWithHeader#body}.
* <pre>
* Byte/ 0 | 1 | 2 | 3 |
* / | | | |
* |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
* +---------------+---------------+---------------+---------------+
* 0/ Header /
* / /
* / /
* / /
* +---------------+---------------+---------------+---------------+
* / Body (a.k.a payload) /
* +---------------+---------------+---------------+---------------+
* </pre>
* The detailed header wire format is shown as below. Header consists of
* <ul>
* <li>1. frame length: the total byte size of header+payload,
* the length is an Integer value, which means maximum frame size would
* be 16MB</li>
* <li>2. message type: which subclass of {@link Message} are wrapped,
* the length is {@link Message.Type#encodedLength()}</li>
* <li>3. encoded message: some {@link Message} may provide additional
* information and being included in the header</li>
* </ul>
* <pre>
* Byte/ 0 | 1 | 2 | 3 |
* / | | | |
* |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
* +---------------+---------------+---------------+---------------+
* 0| frame length |
* +---------------+---------------+---------------+---------------+
* 4| message type | |
* +---------------+ encoded message |
* | |
* +---------------+---------------+---------------+---------------+
* </pre>
*/
@ChannelHandler.Sharable
public final class MessageEncoder extends MessageToMessageEncoder<Message> {
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,31 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}

test("ask a non-existent endpoint remotely") {
env.setupEndpoint("ask-non-existent-remotely", new RpcEndpoint {
override val rpcEnv = env

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case msg: String =>
context.reply(msg)
}
})

val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
try {
val e = intercept[SparkException] {
anotherEnv.setupEndpointRef(env.address, "Oops")
}
// The SparkException cause should be a RpcEndpointNotFoundException
assert(e.getCause.isInstanceOf[RpcEndpointNotFoundException])
assert(e.getCause.getMessage.contains("Cannot find endpoint"))
assert(e.getCause.getMessage.contains(env.address.toString))
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
}
}

test("onStart and onStop") {
val stopLatch = new CountDownLatch(1)
val calledMethods = mutable.ArrayBuffer[String]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
new NettyRpcEnvFactory().create(config)
}

test("non-existent endpoint") {
val uri = RpcEndpointAddress(env.address, "nonexist-endpoint").toString
val e = intercept[SparkException] {
env.setupEndpointRef(env.address, "nonexist-endpoint")
}
assert(e.getCause.isInstanceOf[RpcEndpointNotFoundException])
assert(e.getCause.getMessage.contains(uri))
}

test("advertise address different from bind address") {
val sparkConf = new SparkConf()
val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
Expand Down