Skip to content

[SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000) #17854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

mariahualiu
Copy link

What changes were proposed in this pull request?

In applications that use over 2000 executors, we noticed a large number of failed executors due to driver overloading with too many executor RPCs within a short period of time (for example, retrieve spark properties, executor registration). This patch adds an extra configuration spark.yarn.launchContainer.count.simultaneously, which caps the maximal number of containers that driver can ask for and launch in every spark.yarn.scheduler.heartbeat.interval-ms. As a result, the number of executors grows steadily. The number of executor failures is reduced and applications can reach the desired number of executors faster.

How was this patch tested?

  1. Didn't break relevant unit tests
  2. Tested with a spark application (2500 executors) on a Yarn cluster with 3000 machines.

A gentle ping to the contributors of YarnAllocator: @srowen @foxish @jinxing64 @squito
A JIRA is opened: https://issues.apache.org/jira/browse/SPARK-20564

@squito
Copy link
Contributor

squito commented May 5, 2017

Jenkins, ok to test

@SparkQA
Copy link

SparkQA commented May 5, 2017

Test build #76494 has finished for PR 17854 at commit e1cc521.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented May 5, 2017

It looks to me like this is actually making 2 behavior changes:

  1. throttle the requests for new containers, as you describe in your description
  2. drop newly received containers if they are over the limit (not in the description).

is that correct? Did you find that (2) was necessary as well?

I understand the problem you are describing, but I'm surprised this really helps the driver scale up to more executors. Maybe this will let the executors start, but won't it just lead to the driver getting swamped when you've got 2500 executors sending heartbeats and task updates? I'm not saying its bad to make this improvement, just trying to understand. I'd feel better about just doing (1) -- if you found (2) is necessary, I would want to think through the implications a bit more.

@squito
Copy link
Contributor

squito commented May 5, 2017

also cc @tgravescs @vanzin

@tgravescs
Copy link
Contributor

to slow down launching you could just set spark.yarn.containerLauncherMaxThreads to be smaller. that isn't guaranteed but neither is this really. Just an alternative or something you can do immediately.

I don't see any reason to drop the containers yarn gives you unless you are really slowing it down such that is wasting a lot of resource, it will just cause more overhead. Asking for less to start with could be ok although again its just going to slow down the entire thing. How long is it taking you to launch these?

Also can you put some more details about exactly what you are seeing? I assume its getting timeout exceptions? Exactly where is it timing out and why. It would be nice to really fix or improve that as well longer term. What is your timeout set to? I want to see details so we can determine if other things should be done, like make the registration retry more, are current timeout's sufficient, etc.

@tgravescs
Copy link
Contributor

I took a quick look at the registerExecutor call in CoarseGrainedExecutorBackend and its not retrying at all. We should change that to retry. We retry heartbeats and many other things so it makes sense to retry this. If that has to retry and takes a bit longer its no worse then you artificially delaying the launch and its better because if they can get registered it will be faster then you delaying the launch. Between that and changing the max launcher threads I would think that would be sufficient. If not you should have problems during other times. thoughts?

@vanzin
Copy link
Contributor

vanzin commented May 5, 2017

What do you mean by "not retrying"? Do you mean this line:

ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))

If that's what you mean, there's no need for retrying. No RPC calls retry anymore. See #16503 (comment) for an explanation.

@tgravescs
Copy link
Contributor

If that's what you mean, there's no need for retrying. No RPC calls retry anymore. See #16503 (comment) for an explanation.

I see, I guess with the way we have the rpc implemented it just sitting in the outbox or inbox of receiver anyway, so you are saying it makes more sense to just increase the timeout.
Although looking at it maybe I'm missing how its supposed to handle network failure? I don't see the netty layer retrying on a network connection failure.
call ask -> puts in outbox -> tries to connect -> fails -> removes from outbox and calls onFailure -> executor dies.
am I missing that somehwere? I don't see where we guarantee it got into the remote side inbox.

I definitely agree with you that if things retry they need to be idempotent.

I don't in general agree that we shouldn't retry. If the rpc layer is doing it for us that is fine. There are also special cases where you may want do something like a exponential backoff on waiting between tries, etc. But those would be case by case basis. Lots of things retry connections from hadoop to cell phones, etc. Sometimes weird things happen in large clusters. In a large network things might just take a different route. I try to connect once and it either fails or is slow, try again and it works fine because took different route.

if you have more raw data that says the retries are bad I would be interested.

@vanzin
Copy link
Contributor

vanzin commented May 5, 2017

Although looking at it maybe I'm missing how its supposed to handle network failure?

Spark has never really handled network failure. If the connection between the driver and the executor is cut, Spark sees that as the executor dying.

I don't in general agree that we shouldn't retry... But those would be case by case basis.

Yes, code that wants to retry should to do that explicitly. The old "retry" existed not because of needs of the code making the call, but because Akka could lose messages. The new RPC layer doesn't lose messages (ignoring the TCP reset case), so that old-style retry is not needed anymore.

The connection itself dying is a bigger issue that needs to be handled in the RPC layer if it's really a problem, and the caller retrying isn't really the solution (IMO).

@mariahualiu
Copy link
Author

@squito yes, I capped the number of resources in updateResourceRequests so that YarnAllocator asks for less number of resources in each iteration. When allocation fails one iteration, the request is then added back and YarnAllocator will try to allocate the leftover (from the previous iteration) plus the new requests in the next iteration, which can result a lot of allocated containers. The second change, as you pointed out, is used to address this possibility. On a second thought, maybe it is a better solution to change AMRMClientImpl::allocate where it does not add all resource requests from ask to askList.

@tgravescs I tried reducing spark.yarn.containerLauncherMaxThreads but it didn't help much. My understanding is that these threads send container launch commands to node managers and immediately return, which is very light weight and can be extremely fast. Launching container on NM side is an async operation.

@mariahualiu
Copy link
Author

Let me describe what I've seen when using 2500 executors.

  1. In the first a few (2~3) requests, AM received all (in this case 2500) containers from Yarn.
  2. In a few seconds, 2500 launch container commands were sent out.
  3. It took 3~4 minutes to start an executor on an NM (most of the time was spent on container localization: downloading spark jar, application jar and etc. from the hdfs staging folder).
  4. A large number of executors tried to retrieve spark properties from driver but failed to connect. A massive removing failed executors happened. It seems to me RemoveExecutor is handled by the same single thread that responds to RetrieveSparkProps and RegisterExecutor. As a result, this thread was even busier, and more executors cannot connect/register/etc.
  5. YarnAllocator requested more containers to make up for the failed ones. More executors tried to retrieve spark props and register. However the thread was still overwhelmed by the previous round of executors and cannot respond.

In some cases, we got 5000 executor failures and the application retried and eventually failed.

@mariahualiu
Copy link
Author

I re-ran the same application adding these configurations "--conf spark.yarn.scheduler.heartbeat.interval-ms=15000 --conf spark.yarn.launchContainer.count.simultaneously=50". Though it took 50 iterations to get 2500 containers from Yarn, it was faster to reach 2500 executors since there was much less executor failures and as a result little overhead of removing failed executors and less allocation requests to Yarn.

@mariahualiu
Copy link
Author

Now I can comfortably use 2500 executors. But when I pushed the executor count to 3000, I saw a lot of heartbeat timeout errors. It is something else we can improve, probably another jira.

@tgravescs
Copy link
Contributor

what is your network timeout (spark.network.timeout) set to?

@tgravescs
Copy link
Contributor

also what is the exact error/stack trace you see when you say "failed to connect"?

@squito
Copy link
Contributor

squito commented May 8, 2017

It took 3~4 minutes to start an executor on an NM (most of the time was spent on container localization: downloading spark jar, application jar and etc. from the hdfs staging folder).

I think the biggest improvements might in your cluster setup. I'd ensure that the spark jars (and all dependencies) are already on the local file systems of each node, and keep the application jar as small as possible, by also pushing dependencies of your application onto the local filesystems of each node. That usually keeps the code of your application jar that needs to be shipped around pretty small. Even with it on hdfs, one of the copies is probably on the driver which will still put a lot of pressure on that node.

@mariahualiu
Copy link
Author

@tgravescs I used the default spark.network.timeout (120s). When an executor cannot connect the driver, here is the executor log:
17/05/01 11:18:25 INFO [main] spark.SecurityManager: Changing view acls to: BN2BAPS197CCCC7$,hadoop
17/05/01 11:18:25 INFO [main] spark.SecurityManager: Changing modify acls to: BN2BAPS197CCCC7$,hadoop
17/05/01 11:18:25 INFO [main] spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(BN2BAPS197CCCC7$, hadoop); users with modify permissions: Set(BN2BAPS197CCCC7$, hadoop)
Exception in thread "main" java.io.IOException: Failed to connect to /10.65.97.223:32739
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:200)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:183)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection timed out: no further information: /10.65.97.223:32739
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more

Here is the related driver log:
17/05/01 11:18:48 INFO [Reporter] yarn.YarnAllocator: Completed container container_e126_1493399217501_14096_01_000577 on host: BN2BAPS197CCCC7 (state: COMPLETE, exit status: 1)
17/05/01 11:18:48 WARN [Reporter] yarn.YarnAllocator: Container marked as failed: container_e126_1493399217501_14096_01_000577 on host: BN2BAPS197CCCC7. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_e126_1493399217501_14096_01_000577
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
at org.apache.hadoop.util.Shell.run(Shell.java:487)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:329)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:86)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1

@foxish
Copy link
Contributor

foxish commented May 9, 2017

In Kubernetes/Spark, we see fairly similar behavior in the scenario described. When the simultaneous container launching is not throttled, it is capable of DOSing the system. Our solution so far is to allocate in rounds (similar to the throttling mechanism proposed here), and to wait for readiness of previously launched containers (registration of those executors) before proceeding.

@vanzin
Copy link
Contributor

vanzin commented May 9, 2017

The reason why spark.yarn.containerLauncherMaxThreads does not work here is because it only controls how many threads simultaneously send a container start command to YARN; that is usually a much quicker operation than the container itself starting. Which means that if you're starting a bunch of containers, that setting will only help with sending the requests more quickly (or more slowly) to YARN, but not in throttling how many containers start at the same time.

To achieve the second, you could do something like this change, which adds a new configuration (something I don't like) and also has the issues people have asked about (ignoring containers that YARN has allocated for you).

Or you could change the way YarnAllocator starts containers by having the threads controlled by spark.yarn.containerLauncherMaxThreads actually wait until the containers are launched (not just until the request is sent to YARN). This would properly implement throttling based on the existing config parameter, but is a slightly more complicated change (I think).

@vanzin
Copy link
Contributor

vanzin commented Jul 12, 2017

@mariahualiu do you plan to address any of the feedback here? If not, this should probably be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants