-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000) #17854
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…tors to be launched simultaneously
Jenkins, ok to test |
Test build #76494 has finished for PR 17854 at commit
|
It looks to me like this is actually making 2 behavior changes:
is that correct? Did you find that (2) was necessary as well? I understand the problem you are describing, but I'm surprised this really helps the driver scale up to more executors. Maybe this will let the executors start, but won't it just lead to the driver getting swamped when you've got 2500 executors sending heartbeats and task updates? I'm not saying its bad to make this improvement, just trying to understand. I'd feel better about just doing (1) -- if you found (2) is necessary, I would want to think through the implications a bit more. |
also cc @tgravescs @vanzin |
to slow down launching you could just set spark.yarn.containerLauncherMaxThreads to be smaller. that isn't guaranteed but neither is this really. Just an alternative or something you can do immediately. I don't see any reason to drop the containers yarn gives you unless you are really slowing it down such that is wasting a lot of resource, it will just cause more overhead. Asking for less to start with could be ok although again its just going to slow down the entire thing. How long is it taking you to launch these? Also can you put some more details about exactly what you are seeing? I assume its getting timeout exceptions? Exactly where is it timing out and why. It would be nice to really fix or improve that as well longer term. What is your timeout set to? I want to see details so we can determine if other things should be done, like make the registration retry more, are current timeout's sufficient, etc. |
I took a quick look at the registerExecutor call in CoarseGrainedExecutorBackend and its not retrying at all. We should change that to retry. We retry heartbeats and many other things so it makes sense to retry this. If that has to retry and takes a bit longer its no worse then you artificially delaying the launch and its better because if they can get registered it will be faster then you delaying the launch. Between that and changing the max launcher threads I would think that would be sufficient. If not you should have problems during other times. thoughts? |
What do you mean by "not retrying"? Do you mean this line:
If that's what you mean, there's no need for retrying. No RPC calls retry anymore. See #16503 (comment) for an explanation. |
I see, I guess with the way we have the rpc implemented it just sitting in the outbox or inbox of receiver anyway, so you are saying it makes more sense to just increase the timeout. I definitely agree with you that if things retry they need to be idempotent. I don't in general agree that we shouldn't retry. If the rpc layer is doing it for us that is fine. There are also special cases where you may want do something like a exponential backoff on waiting between tries, etc. But those would be case by case basis. Lots of things retry connections from hadoop to cell phones, etc. Sometimes weird things happen in large clusters. In a large network things might just take a different route. I try to connect once and it either fails or is slow, try again and it works fine because took different route. if you have more raw data that says the retries are bad I would be interested. |
Spark has never really handled network failure. If the connection between the driver and the executor is cut, Spark sees that as the executor dying.
Yes, code that wants to retry should to do that explicitly. The old "retry" existed not because of needs of the code making the call, but because Akka could lose messages. The new RPC layer doesn't lose messages (ignoring the TCP reset case), so that old-style retry is not needed anymore. The connection itself dying is a bigger issue that needs to be handled in the RPC layer if it's really a problem, and the caller retrying isn't really the solution (IMO). |
@squito yes, I capped the number of resources in updateResourceRequests so that YarnAllocator asks for less number of resources in each iteration. When allocation fails one iteration, the request is then added back and YarnAllocator will try to allocate the leftover (from the previous iteration) plus the new requests in the next iteration, which can result a lot of allocated containers. The second change, as you pointed out, is used to address this possibility. On a second thought, maybe it is a better solution to change AMRMClientImpl::allocate where it does not add all resource requests from ask to askList. @tgravescs I tried reducing spark.yarn.containerLauncherMaxThreads but it didn't help much. My understanding is that these threads send container launch commands to node managers and immediately return, which is very light weight and can be extremely fast. Launching container on NM side is an async operation. |
Let me describe what I've seen when using 2500 executors.
In some cases, we got 5000 executor failures and the application retried and eventually failed. |
I re-ran the same application adding these configurations "--conf spark.yarn.scheduler.heartbeat.interval-ms=15000 --conf spark.yarn.launchContainer.count.simultaneously=50". Though it took 50 iterations to get 2500 containers from Yarn, it was faster to reach 2500 executors since there was much less executor failures and as a result little overhead of removing failed executors and less allocation requests to Yarn. |
Now I can comfortably use 2500 executors. But when I pushed the executor count to 3000, I saw a lot of heartbeat timeout errors. It is something else we can improve, probably another jira. |
what is your network timeout (spark.network.timeout) set to? |
also what is the exact error/stack trace you see when you say "failed to connect"? |
I think the biggest improvements might in your cluster setup. I'd ensure that the spark jars (and all dependencies) are already on the local file systems of each node, and keep the application jar as small as possible, by also pushing dependencies of your application onto the local filesystems of each node. That usually keeps the code of your application jar that needs to be shipped around pretty small. Even with it on hdfs, one of the copies is probably on the driver which will still put a lot of pressure on that node. |
@tgravescs I used the default spark.network.timeout (120s). When an executor cannot connect the driver, here is the executor log: Here is the related driver log: Container exited with a non-zero exit code 1 |
In Kubernetes/Spark, we see fairly similar behavior in the scenario described. When the simultaneous container launching is not throttled, it is capable of DOSing the system. Our solution so far is to allocate in rounds (similar to the throttling mechanism proposed here), and to wait for readiness of previously launched containers (registration of those executors) before proceeding. |
The reason why To achieve the second, you could do something like this change, which adds a new configuration (something I don't like) and also has the issues people have asked about (ignoring containers that YARN has allocated for you). Or you could change the way |
@mariahualiu do you plan to address any of the feedback here? If not, this should probably be closed. |
What changes were proposed in this pull request?
In applications that use over 2000 executors, we noticed a large number of failed executors due to driver overloading with too many executor RPCs within a short period of time (for example, retrieve spark properties, executor registration). This patch adds an extra configuration spark.yarn.launchContainer.count.simultaneously, which caps the maximal number of containers that driver can ask for and launch in every spark.yarn.scheduler.heartbeat.interval-ms. As a result, the number of executors grows steadily. The number of executor failures is reduced and applications can reach the desired number of executors faster.
How was this patch tested?
A gentle ping to the contributors of YarnAllocator: @srowen @foxish @jinxing64 @squito
A JIRA is opened: https://issues.apache.org/jira/browse/SPARK-20564