Skip to content

[SPARK-11140] [core] Transfer files using network lib when using NettyRpcEnv. #9530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Nov 6, 2015

This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.

The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.

…yRpcEnv.

This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.

The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.
@vanzin
Copy link
Contributor Author

vanzin commented Nov 6, 2015

/cc @rxin @zsxwing

@vanzin
Copy link
Contributor Author

vanzin commented Nov 7, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 7, 2015

Test build #45292 has finished for PR 9530 at commit 3f8209f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 8, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 8, 2015

Test build #45293 has finished for PR 9530 at commit 3f8209f.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2015

Test build #45383 has finished for PR 9530 at commit 87b7a91.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class JavaAFTSurvivalRegressionExample\n

@zsxwing
Copy link
Member

zsxwing commented Nov 9, 2015

retest this please

This reuses more of the existing code in Utils, and also makes some
subsequent changes built on top of this API possible, at the expense
of some efficiency (using input streams instead of channels).

If desired the Utils class can later be changed to use channels to
regain the lost efficiency.
@SparkQA
Copy link

SparkQA commented Nov 9, 2015

Test build #45402 has finished for PR 9530 at commit 87b7a91.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 9, 2015

retest this please


override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
if (addr != null) {
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reverted it back to the old codes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't follow. The line was moved inside the new conditions, nothing's really changing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You added a new if if (clients.containsKey(client)) {. There was a similar logic here: https://github.com/apache/spark/pull/9210/files#diff-0c89b4a60c30a7cd2224bb64d93da942L477 but you removed it. Now you are adding it back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks!

@zsxwing
Copy link
Member

zsxwing commented Nov 9, 2015

LGTM. I noticed that Utils.fetchFile cannot handle special chars (e.g., %, ) in file name. But since the http server has the same issue, it should not block this PR.


private class TimeoutHandler(client: TransportClient) extends ChannelInboundHandlerAdapter {

override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is a little bit racy, since the timeout might trigger when another thread is preparing to download a file. I'll fix this and update the PR.

source
}

private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if downloading from different hosts (different tasks in the same Executor), they need to connect one by one. Right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Downloading from different hosts means different clients. But different tasks will still all download from the same host (the driver).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. All tasks will download from the same driver.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found another case: fetchFiles is also used in DriverRunner in the Worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine. What's your worry here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought I think that's fine since downloading are still in parallel.

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45446 has finished for PR 9530 at commit a222c03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val socketAddr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
val address = RpcAddress(socketAddr.getHostName(), socketAddr.getPort())
ctx.close()
NettyRpcEnv.this.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock order here is in reverse order of fileDownloadClient. A potential dead-lock issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed.

@SparkQA
Copy link

SparkQA commented Nov 10, 2015

Test build #45540 has finished for PR 9530 at commit a89e665.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45552 has finished for PR 9530 at commit 11f61a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2015

Test build #45547 has finished for PR 9530 at commit 71ac0cf.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45904 has finished for PR 9530 at commit 908a051.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * case class AppendColumns[T, U](\n

@vanzin
Copy link
Contributor Author

vanzin commented Nov 17, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46041 has finished for PR 9530 at commit 908a051.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2015

Alright I looked a bit closer at this. Since it is not mission critical and it is touching some mission critical part, I think it's best to not to merge this in 1.6.

Let's merge this once 1.6 has an rc?

@vanzin
Copy link
Contributor Author

vanzin commented Nov 18, 2015

What's the problem with merging to master? Isn't that why we have branch-1.6?

I have changes built on top of this that I'm waiting to send for review.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2015

I'm mostly thinking it'd make it harder if we found bugs that need to fix.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 18, 2015

Do you mean bugs in this code? Or in 1.6? If in this code, master will eventually go through stabilization also, and at that point if there are bugs, they'll surface. Whether this code goes in now or later doesn't change much in that regard.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2015

There are still some network related code that's going into 1.6 (one is the config naming thing). I'm concerned they might conflict. Maybe you can just submit more prs basing on previous ones for reviewing?

We should be pretty close to cutting a rc anyway.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 18, 2015

Maybe you can just submit more prs basing on previous ones

I don't know of any clean way of doing that in github; they'd end up including all the code in this PR, which is rather ugly / sub-optimal.

I can wait for the config change (I want to push it soon anyway), but I'm not super worried about conflicts. Those are generally not hard to resolve.

@SparkQA
Copy link

SparkQA commented Nov 18, 2015

Test build #46261 has finished for PR 9530 at commit d10fa97.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with Writable\n

@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46265 has finished for PR 9530 at commit 7cc83e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * class ALS(override val uid: String) extends Estimator[ALSModel] with ALSParams with Writable\n

This makes the code friendlier to SPARK-11097 (if and when
it's implemented), and also avoids custom timeout handling
code by reusing features of the transport library.

Need to test on a real cluster, though...
@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46289 has finished for PR 9530 at commit cfd01bd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

But use the rpc config as the default. Also now properly tested on
a real cluster, and verified idle connections are closed.
@SparkQA
Copy link

SparkQA commented Nov 19, 2015

Test build #46349 has finished for PR 9530 at commit 0e8e4bb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 19, 2015

sigh... retest this please.

@SparkQA
Copy link

SparkQA commented Nov 20, 2015

Test build #46361 has finished for PR 9530 at commit 0e8e4bb.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2015

Test build #46388 has finished for PR 9530 at commit c5a218d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Nov 20, 2015

LGTM. But let's just merge this one to master since it's too risky to add a new feature to 1.6 now.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 23, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 23, 2015

Test build #46542 has finished for PR 9530 at commit c5a218d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Nov 23, 2015

Merging to master.

@vanzin vanzin closed this Nov 23, 2015
@vanzin vanzin deleted the SPARK-11140 branch November 23, 2015 21:55
asfgit pushed a commit that referenced this pull request Nov 23, 2015
…RpcEnv.

This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.

The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9530 from vanzin/SPARK-11140.
@JoshRosen
Copy link
Contributor

The following two tests have failed for the past three builds in the Hadoop 2.4 Maven Jenkins job:

  • org.apache.spark.deploy.yarn.YarnClusterSuite.run Spark in yarn-client mode
  • org.apache.spark.deploy.yarn.YarnClusterSuite.user class path first in client mode

According to Jenkins, these failures first started in https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-Maven-with-YARN/HADOOP_PROFILE=hadoop-2.4,label=spark-test/4223/

Given the set of patches in that changeset, my hunch is that this might have somehow been caused by either #9853 or #9530 (this patch). Branch 1.6 only includes #9853 and seems to be passing these tests, so I therefore believe that this patch may be the culprit (since it's only present in master and not 1.6). @vanzin, could you help me look into this?

vanzin pushed a commit to vanzin/spark that referenced this pull request Nov 24, 2015
…RpcEnv.

This change abstracts the code that serves jars / files to executors so that
each RpcEnv can have its own implementation; the akka version uses the existing
HTTP-based file serving mechanism, while the netty versions uses the new
stream support added to the network lib, which makes file transfers benefit
from the easier security configuration of the network library, and should also
reduce overhead overall.

The change includes a small fix to TransportChannelHandler so that it propagates
user events to downstream handlers.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#9530 from vanzin/SPARK-11140.

(cherry picked from commit c2467da)
@squito
Copy link
Contributor

squito commented Nov 25, 2015

Hi @JoshRosen , I was just looking into this. With some local testing, I could confirm that this patch did lead to those failures, but that it is fixed by #9923 (in particular the handling of an empty stream in TransportResponseHandler)

@squito
Copy link
Contributor

squito commented Nov 25, 2015

(or #9941, which is just the bug fixes w/out the other changes ...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants