-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-14475] Propagate user-defined context from driver to executors #12248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #55276 has finished for PR 12248 at commit
|
The MiMa failure is because
|
@@ -206,6 +210,11 @@ private[spark] object Task { | |||
dataOut.writeLong(timestamp) | |||
} | |||
|
|||
// Write the task properties separately so it is available before full task deserialization. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the properties aren't transient in Task
, I guess this means that we'll write them out twice. If we want to avoid this, we can make localProperties
into a @transient
var
which is private[spark]
then re-set the field after deserializing the task. Tasks are send to executors using broadcast variables, so the extra space only makes a different for the first task from a stage that's run on an executor.
As a result, if we think that these serialized properties will typically be small then the extra space savings probably aren't a huge deal, but if we want to heavily optimize then we can do the var
trick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
To fix MiMa, add the ignores to spark/project/MimaExcludes.scala Line 613 in 49fb237
|
Test build #55287 has finished for PR 12248 at commit
|
Test build #55285 has finished for PR 12248 at commit
|
Test build #55290 has finished for PR 12248 at commit
|
Test build #55291 has finished for PR 12248 at commit
|
Test build #55320 has finished for PR 12248 at commit
|
Backing up, how does this differ from using broadcast variables for data, or for simply sending small properties objects in a closure? does it need all this complexity of yet another mechanism? |
The main difference is that propagation is transparent to user code. For On Fri, Apr 8, 2016, 2:40 AM Sean Owen notifications@github.com wrote:
|
Propagating references from a closure is already transparent; you just reference whatever you want like a Properties object and it goes with the task. What's the use case for something more than this? |
It's not though, if you want to propagate something new without manually passing it through all your closures this cannot be done today. For example, consider a spark library that wants to implement a per-job
What you have to do now is something more like:
which is more verbose and hard to maintain. |
Your change is about passing around a Your example however seems to be about configuring some global per-function behavior, not sending props. In this example, why would the library not call |
That's not true, if you access a static
It's more about configuring behavior based on some property set by some upstream caller of the function. The idea is that the user wants to configure loglevel just for this job, without impacting any other jobs potentially running on the cluster.
Sorry, I should have made the example more explicit. setLogLevel would be implemented in the driver side as I think more generally that this adds a mechanism for passing values implicitly without requiring the user (that is writing Spark code) to manually reference it in each of their closures. You are right that this can be achieved via other mechanisms, but those may not be convenient or practical for the use case e.g. if you want to integrate with something like X-trace (which out of the scope of this PR, but would be easy to add once we have the mechanism). |
@srowen, I think that the main use-case for this feature is associating metadata associated with a Spark action / execution and making that metadata accessible in that action's tasks. For instance, let's say that I run a Spark SQL query and want to propagate some metadata related to that query execution from the driver to the executors for use in tracing / debugging / instrumentation. Maybe I want to propagate a label associated with all tasks launched from the job, such as a job group name, and read that label in a custom log appender so that my log messages from those tasks contain that metadata. In this case, the actual RDD code isn't controlled by the user and they don't really have a place to interpose broadcast variables or other custom code for propagating this metadata. Even the user's library code were to use broadcast variables and define thread-local variables, etc., then they'd have to worry about some subtleties related to Spark's internal threading model: for example, thread-locals need to be handled carefully to make sure that they're correctly propagated across thread-boundaries in PythonRDD, RRDD, ScriptTransformation, PipedRDD, etc., and the set of places where you'd need to do that propagation corresponds exactly to the set of places where we already happen to be propagating the TaskContext thread-local. Given that |
I mean making a I understand Josh's use case more. There are certainly tasks and RDDs entirely internal to some Spark process. But those also won't know anything about what to do with some custom user properties. Maybe eventually they invoke a UDF that could use these properties. In many cases that UDF could still just refer to whatever config you like directly (right?) but I'm probably not thinking of some case where this fails to work. I take the point about this already being an API for the caller anyway. |
with Logging { | ||
|
||
/** A constructor used only in test suites. This does not require passing in an RDD. */ | ||
def this(partitionId: Int) { | ||
this(0, 0, null, new Partition { override def index: Int = 0 }, null, null) | ||
this(0, 0, null, new Partition { override def index: Int = 0 }, null, null, new Properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can avoid making empty Properties
all over ... an Option[Properties]
? a setter that is called only where needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Properties objects are kind of analogous to Maps
and I think that Option[Map]
would be kind of a weird type in the same sense that Option[Set]
(or any other collection type) is usually kind a weird code-smell So, this is fine with me as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed safer to make it required. I can change this to an option if you think creating a Properties each time is too much overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, I suppose allocating the empty map/properties object isn't that expensive.
@srowen, suppose you have an existing service running Spark jobs that read from a custom datasource. You want to add log4j trace annotations in order to attribute datasource logs back to the original caller of the service. However you want to avoid invasive changes to the existing code. This is a two-line change with the proposed API.
The alternative is to explicitly reference |
This can as easily be ...
I get that if I think the fact it's already an API reduces the cost of a change like this in comparison, so I can see the argument for it. |
Yes exactly, this is for implementing functionality such as tracing, where to users any existing code modification may be too burdensome due to e.g. too much plumbing or libraries they cannot modify. It's the same argument for thread-locals, but in this case spanning driver -> worker interactions. |
It seems like we agree that this API is easy-to-support in Spark and hard/impossible to implement as cleanly in client code. As a result, I think this is okay to merge, so I'm going to run this one more time and will merge a bit after Jenkins passes. If anyone thinks that we need more discussion before accepting this API, let me know. |
Jenkins, retest this please. |
Test build #55545 has finished for PR 12248 at commit
|
Going to merge this in maser. Thanks. |
@ericl @JoshRosen @srowen |
What changes were proposed in this pull request?
This adds a new API call
TaskContext.getLocalProperty
for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called.How was this patch tested?
Unit tests.
cc @JoshRosen