-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-14475] Propagate user-defined context from driver to executors #12248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8b07f1
9104f0c
b2fc541
37f269e
3bf34b7
82646a9
2542d01
964ee4b
0e46c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler | |
|
||
import java.io.{DataInputStream, DataOutputStream} | ||
import java.nio.ByteBuffer | ||
import java.util.Properties | ||
|
||
import scala.collection.mutable.HashMap | ||
|
||
|
@@ -46,12 +47,14 @@ import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Uti | |
* @param initialAccumulators initial set of accumulators to be used in this task for tracking | ||
* internal metrics. Other accumulators will be registered later when | ||
* they are deserialized on the executors. | ||
* @param localProperties copy of thread-local properties set by the user on the driver side. | ||
*/ | ||
private[spark] abstract class Task[T]( | ||
val stageId: Int, | ||
val stageAttemptId: Int, | ||
val partitionId: Int, | ||
val initialAccumulators: Seq[Accumulator[_]]) extends Serializable { | ||
val initialAccumulators: Seq[Accumulator[_]], | ||
@transient var localProperties: Properties) extends Serializable { | ||
|
||
/** | ||
* Called by [[org.apache.spark.executor.Executor]] to run this task. | ||
|
@@ -71,6 +74,7 @@ private[spark] abstract class Task[T]( | |
taskAttemptId, | ||
attemptNumber, | ||
taskMemoryManager, | ||
localProperties, | ||
metricsSystem, | ||
initialAccumulators) | ||
TaskContext.setTaskContext(context) | ||
|
@@ -206,6 +210,11 @@ private[spark] object Task { | |
dataOut.writeLong(timestamp) | ||
} | ||
|
||
// Write the task properties separately so it is available before full task deserialization. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the properties aren't transient in As a result, if we think that these serialized properties will typically be small then the extra space savings probably aren't a huge deal, but if we want to heavily optimize then we can do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
val propBytes = Utils.serialize(task.localProperties) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wasn't sure how to deserialize on the Executor side. Perhap env.serializer there? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, good point. |
||
dataOut.writeInt(propBytes.length) | ||
dataOut.write(propBytes) | ||
|
||
// Write the task itself and finish | ||
dataOut.flush() | ||
val taskBytes = serializer.serialize(task) | ||
|
@@ -221,7 +230,7 @@ private[spark] object Task { | |
* @return (taskFiles, taskJars, taskBytes) | ||
*/ | ||
def deserializeWithDependencies(serializedTask: ByteBuffer) | ||
: (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = { | ||
: (HashMap[String, Long], HashMap[String, Long], Properties, ByteBuffer) = { | ||
|
||
val in = new ByteBufferInputStream(serializedTask) | ||
val dataIn = new DataInputStream(in) | ||
|
@@ -240,8 +249,13 @@ private[spark] object Task { | |
taskJars(dataIn.readUTF()) = dataIn.readLong() | ||
} | ||
|
||
val propLength = dataIn.readInt() | ||
val propBytes = new Array[Byte](propLength) | ||
dataIn.readFully(propBytes, 0, propLength) | ||
val taskProps = Utils.deserialize[Properties](propBytes) | ||
|
||
// Create a sub-buffer for the rest of the data, which is the serialized Task object | ||
val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task | ||
(taskFiles, taskJars, subBuffer) | ||
(taskFiles, taskJars, taskProps, subBuffer) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can avoid making empty
Properties
all over ... anOption[Properties]
? a setter that is called only where needed?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Properties objects are kind of analogous to
Maps
and I think thatOption[Map]
would be kind of a weird type in the same sense thatOption[Set]
(or any other collection type) is usually kind a weird code-smell So, this is fine with me as is.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seemed safer to make it required. I can change this to an option if you think creating a Properties each time is too much overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, I suppose allocating the empty map/properties object isn't that expensive.