-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11140] [core] Transfer files using network lib when using NettyRpcEnv. #9530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…yRpcEnv. This change abstracts the code that serves jars / files to executors so that each RpcEnv can have its own implementation; the akka version uses the existing HTTP-based file serving mechanism, while the netty versions uses the new stream support added to the network lib, which makes file transfers benefit from the easier security configuration of the network library, and should also reduce overhead overall. The change includes a small fix to TransportChannelHandler so that it propagates user events to downstream handlers.
retest this please |
Test build #45292 has finished for PR 9530 at commit
|
retest this please |
Test build #45293 has finished for PR 9530 at commit
|
Test build #45383 has finished for PR 9530 at commit
|
retest this please |
This reuses more of the existing code in Utils, and also makes some subsequent changes built on top of this API possible, at the expense of some efficiency (using input streams instead of channels). If desired the Utils class can later be changed to use channels to regain the lost efficiency.
Test build #45402 has finished for PR 9530 at commit
|
retest this please |
|
||
override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = { | ||
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress] | ||
if (addr != null) { | ||
val clientAddr = RpcAddress(addr.getHostName, addr.getPort) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reverted it back to the old codes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't follow. The line was moved inside the new conditions, nothing's really changing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You added a new if if (clients.containsKey(client)) {
. There was a similar logic here: https://github.com/apache/spark/pull/9210/files#diff-0c89b4a60c30a7cd2224bb64d93da942L477 but you removed it. Now you are adding it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's explained in the following comment:
https://github.com/apache/spark/pull/9530/files#diff-0c89b4a60c30a7cd2224bb64d93da942R85
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks!
LGTM. I noticed that |
|
||
private class TimeoutHandler(client: TransportClient) extends ChannelInboundHandlerAdapter { | ||
|
||
override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is a little bit racy, since the timeout might trigger when another thread is preparing to download a file. I'll fix this and update the PR.
source | ||
} | ||
|
||
private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if downloading from different hosts (different tasks in the same Executor), they need to connect one by one. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Downloading from different hosts means different clients. But different tasks will still all download from the same host (the driver).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right. All tasks will download from the same driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just found another case: fetchFiles
is also used in DriverRunner
in the Worker
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. What's your worry here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought I think that's fine since downloading are still in parallel.
Test build #45446 has finished for PR 9530 at commit
|
val socketAddr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] | ||
val address = RpcAddress(socketAddr.getHostName(), socketAddr.getPort()) | ||
ctx.close() | ||
NettyRpcEnv.this.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock order here is in reverse order of fileDownloadClient
. A potential dead-lock issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, fixed.
Test build #45540 has finished for PR 9530 at commit
|
Test build #45552 has finished for PR 9530 at commit
|
Test build #45547 has finished for PR 9530 at commit
|
Test build #45904 has finished for PR 9530 at commit
|
retest this please |
Test build #46041 has finished for PR 9530 at commit
|
Alright I looked a bit closer at this. Since it is not mission critical and it is touching some mission critical part, I think it's best to not to merge this in 1.6. Let's merge this once 1.6 has an rc? |
What's the problem with merging to master? Isn't that why we have branch-1.6? I have changes built on top of this that I'm waiting to send for review. |
I'm mostly thinking it'd make it harder if we found bugs that need to fix. |
Do you mean bugs in this code? Or in 1.6? If in this code, master will eventually go through stabilization also, and at that point if there are bugs, they'll surface. Whether this code goes in now or later doesn't change much in that regard. |
There are still some network related code that's going into 1.6 (one is the config naming thing). I'm concerned they might conflict. Maybe you can just submit more prs basing on previous ones for reviewing? We should be pretty close to cutting a rc anyway. |
I don't know of any clean way of doing that in github; they'd end up including all the code in this PR, which is rather ugly / sub-optimal. I can wait for the config change (I want to push it soon anyway), but I'm not super worried about conflicts. Those are generally not hard to resolve. |
Test build #46261 has finished for PR 9530 at commit
|
Test build #46265 has finished for PR 9530 at commit
|
This makes the code friendlier to SPARK-11097 (if and when it's implemented), and also avoids custom timeout handling code by reusing features of the transport library. Need to test on a real cluster, though...
Test build #46289 has finished for PR 9530 at commit
|
But use the rpc config as the default. Also now properly tested on a real cluster, and verified idle connections are closed.
Test build #46349 has finished for PR 9530 at commit
|
sigh... retest this please. |
Test build #46361 has finished for PR 9530 at commit
|
Test build #46388 has finished for PR 9530 at commit
|
LGTM. But let's just merge this one to master since it's too risky to add a new feature to 1.6 now. |
retest this please |
Test build #46542 has finished for PR 9530 at commit
|
Merging to master. |
…RpcEnv. This change abstracts the code that serves jars / files to executors so that each RpcEnv can have its own implementation; the akka version uses the existing HTTP-based file serving mechanism, while the netty versions uses the new stream support added to the network lib, which makes file transfers benefit from the easier security configuration of the network library, and should also reduce overhead overall. The change includes a small fix to TransportChannelHandler so that it propagates user events to downstream handlers. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9530 from vanzin/SPARK-11140.
The following two tests have failed for the past three builds in the Hadoop 2.4 Maven Jenkins job:
According to Jenkins, these failures first started in https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/4223/ Given the set of patches in that changeset, my hunch is that this might have somehow been caused by either #9853 or #9530 (this patch). Branch 1.6 only includes #9853 and seems to be passing these tests, so I therefore believe that this patch may be the culprit (since it's only present in master and not 1.6). @vanzin, could you help me look into this? |
…RpcEnv. This change abstracts the code that serves jars / files to executors so that each RpcEnv can have its own implementation; the akka version uses the existing HTTP-based file serving mechanism, while the netty versions uses the new stream support added to the network lib, which makes file transfers benefit from the easier security configuration of the network library, and should also reduce overhead overall. The change includes a small fix to TransportChannelHandler so that it propagates user events to downstream handlers. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes apache#9530 from vanzin/SPARK-11140. (cherry picked from commit c2467da)
Hi @JoshRosen , I was just looking into this. With some local testing, I could confirm that this patch did lead to those failures, but that it is fixed by #9923 (in particular the handling of an empty stream in |
(or #9941, which is just the bug fixes w/out the other changes ...) |
This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.
The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.