-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4087] use broadcast for task only when task is large enough #2933
Conversation
Test build #22161 has started for PR 2933 at commit
|
Test build #22161 has finished for PR 2933 at commit
|
Test FAILed. |
@ScrapCodes It looks like the Scala style checks failed due to a line that contained 104 characters, but the scalastyle output didn't list the actual cause of the failure:
Any idea why it's not displaying the cause of the failure? |
Test build #418 has started for PR 2933 at commit
|
Test build #22169 has started for PR 2933 at commit
|
Test build #418 has finished for PR 2933 at commit
|
Test build #22169 has finished for PR 2933 at commit
|
Test FAILed. |
Do you mind opening a JIRA for this? You can link it to the other broadcast optimization one. |
@@ -124,6 +123,10 @@ class DAGScheduler( | |||
/** If enabled, we may run certain actions like take() and first() locally. */ | |||
private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false) | |||
|
|||
/** Broadcast the serialized tasks only when they are bigger than it */ | |||
private val broadcastTaskMinSize = | |||
sc.getConf.getInt("spark.scheduler.broadcastTaskMinSize", 8) * 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the serialized task ends up being sent in an Akka message, so there could be problems if a user configures this to be higher than the capacity of the Akka frame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, user will take the risk if they change it to non-reasonable values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps call this broadcastTaskMinSizeKB? Should we document this flag? Either way, there should be some mention that your jobs will literally stop working silently if you change this to be similarly to the akka frame size. It is not clear that this is sent via Akka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to keep this internal, it's a tradeoff between 1.0 and 1.1, most of the users do need to touch this.
We could document it later if user really need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user change akka frame size to a small one, the jobs also will stop working silently even without this patch.
I think we should have good default values for these, and assume that user know the risk if they want to change some configs, it's not easy to make sure that they are consistant between all possible values for all the configs.
@@ -69,6 +70,10 @@ private[spark] class Stage( | |||
var resultOfJob: Option[ActiveJob] = None | |||
var pendingTasks = new HashSet[Task[_]] | |||
|
|||
/** This is used to track the life cycle of broadcast, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super-minor style nit, but I think our usual style is to not place comments on the first /**
line.
Test build #427 has started for PR 2933 at commit
|
Test build #22196 has started for PR 2933 at commit
|
Test build #427 has finished for PR 2933 at commit
|
@JoshRosen |
Test build #22196 timed out for PR 2933 at commit |
Test FAILed. |
Test build #450 has started for PR 2933 at commit
|
Test build #450 has finished for PR 2933 at commit
|
Test build #22223 has started for PR 2933 at commit
|
Test build #22223 has finished for PR 2933 at commit
|
Test PASSed. |
I've been thinking about this some more and I wonder about the motivation for this change: how much of a performance benefit does this buy us for typical workloads? This (and the other torrentbroadcast inlining patch) add extra code-paths / complexity, but do they buy us measurable performance benefits? I'm concerned about adding extra branches to already-complicated code. |
@JoshRosen The motivation is not about performance, it's about stability. Sending tasks to executors is the critical part in spark, it should be as stable as possible. Using broadcast to sending tasks bring much of the complexity (runtime) to it, actually it introduce some problems for us (we did not have them in 1.0). The motivation of this patch is remove the complexity of broadcast in most cases, only using it when broadcast can bring performance benefits (the tasks is large enough). In the future, maybe we could increase broadcastTaskMinSizeKB to 100 or even more. This bring some complexity for code (not big), but actually simplify the runtime behavior. It also will have some performance gain (no RPC or cache at all), |
I find this a little bit hacky. If the broadcast implementation has bugs or performance issues, we should just fix them and it will stabalize over time like any other new features we add. Having this mode where we might do one thing and might do another, it will make debugging and measuring things trickier. And we'll expose this configuration option which it seems like ultimately we will want to remove. IMO this would only be justified if we had a well documented performance issue that we felt we simply can't solve within the broadcast architecture, then you would give a latch here for people to avoid broadcasting. |
Broadcast (especially TorrentBroadcast) is designed for large object, using it to send out small shared variables just like using tank to shot a mosquitoes, it's not a good approach in the begging, which make simple things complicated. The motivation of broadcasting tasks, is to solve the performance for |
I don't see fundamentally why the broadcast mechanism can't be done as efficiently as task launching itself. Do you have a reproducible workload where this caused a performance regression and we couldn't optimize the broadcast sufficiently? |
We're fighting with the problem of failure during deserialize a task for days (failed in TorrentBroadcast), they can not be reproduced easily. Hope that we can fix it before 1.2 release. |
I thought we had fixed this issue; can you point me to new occurrences of it? |
@JoshRosen I think we still have it (in tests at tonight):
I think this exception is triggled in TorrentBroadcast. |
This is really strange; I thought that the "unexpected exception type" would have been addressed by #2932 |
Can you point me to the commit that produced that stacktrace? |
@JoshRosen @pwendell The test branch (internal) did not have that commit. That commit only improve the logging, it did not solve the stability problem. |
Test build #486 has started for PR 2933 at commit
|
Test build #486 has finished for PR 2933 at commit
|
Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Test build #22746 has started for PR 2933 at commit
|
Test build #22746 has finished for PR 2933 at commit
|
Test PASSed. |
I agree with @pwendell . It seems like the right thing to do is just fix Broadcast ... and if we can't, then wouldn't you also want to turn off Broadcast even for big closures? |
What's the status on this PR / JIRA? As far as I know, it seems that TorrentBroadcast has been more stable lately, so if the only motivation here was stability then I think we might be able to close this. |
Close this now. |
Using broadcast for small tasks has no benefits or even some regressions (several RPCs), also there some stable issues with broadcast, so we should use broadcast for tasks only when the serialized tasks are large enough (larger than 8k, be default, maybe changed in future).
In practice, most of tasks are small, so this should improve the stability for most user cases, especially for tests, which will start and stop context multiple times.