-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage #31876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Also cc @mridulm @tgravescs @jiangxb1987 for more inputs. |
@@ -28,16 +28,23 @@ import org.apache.spark.internal.config | |||
import org.apache.spark.storage.BlockManagerId | |||
import org.apache.spark.util.Utils | |||
|
|||
trait Location extends Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Externalizable
would require the implementers to provide a zero-parameter constructor. Do you think it's acceptable or have any better idea? @hiboyang @attilapiros
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original BlockManagerId extends Externalizable
, I think Externalizable
here for Location
is ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed some context here, but why Externalizable
and not Serializable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the BlockManagerId
already used Externalizable
and we want to make the BlockManagerId
as a default implementation of Location
for Spark.
A possible way to let Location
extends Serializable
is to let BlockManagerId
extends Serializable
directly too. But we need to figure out whether serializers could handle topologyInfo_: Option[String]
properly as topologyInfo_
now is handled manually in readExternal/writeExternal
:
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { | |
out.writeUTF(executorId_) | |
out.writeUTF(host_) | |
out.writeInt(port_) | |
out.writeBoolean(topologyInfo_.isDefined) | |
// we only write topologyInfo if we have it | |
topologyInfo.foreach(out.writeUTF(_)) | |
} | |
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { | |
executorId_ = in.readUTF() | |
host_ = in.readUTF() | |
port_ = in.readInt() | |
val isTopologyInfoAvailable = in.readBoolean() | |
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UsingExternalizable
here is totally fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify: Externalizable
takes precedence over Serializable
.
We can continue to use Externalizable
for BlockManagerId
while not requiring other implementations to necessarily require it.
def host: String | ||
def port: Int | ||
def hostPort: String | ||
def executorId: String = "unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this only for the convenient purpose, doesn't mean I have any preference for the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, adding this here helps for this initial version. Could we add something like storageInfo: Option[Serializable]
, which could be used to store extra information for different disaggregated shuffle solutions? e.g. storageInfo could be remote storage file path(s) or remote shuffle server(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For "store extra information", I think implementors can add whatever they want only if they're serializable. e.g., extra info can be Option[Serializable]
or Map[String, String]
.
But I did think about adding a common StorageType
class to Location
. For example, a valid use case is that we could know from it whether the storage is reliable (e.g., location.storageType.isReliable
) so we can decide whether to apply "decommission" on such storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or could we add topologyInfo: Option[String]
in Location
, like other fields host/port/...
which are from BlockManagerId
?
location.storageType.isReliable
is a good idea for "decommission".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on discussion, if we are going to mirror everything in BlockManagerId
, why not expose that directly with ability to subclass it and avoid Location
altogether ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it (exposing BlockManagerId
) depends on the use cases. We could probably expose it if BlockManagerId
is generally satisfied all the common use cases(e.g., BlockManager, hdfs, file server). That being said, I think the concept of BlockManagerId
is not suitable for every case. For example, each executor would be corresponding to a BlockManagerId
. But, in the case of hdfs, all the executors would only have one corresponding location indeed.
Adding Location
would be more flexible for users if they have specific requirements that BlockManagerId
can't satisfy. Besides, I think it's safer for Spark as BlockManagerId
is not only used for shuffle phase but also widely used in RDD cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or could we add topologyInfo: Option[String] in Location, like other fields host/port/... which are from BlockManagerId?
Do you know other cases that need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uber Remote Shuffle Service uses topologyInfo to store shuffle server information. Since we kind of mirror things in BlockManagerId
, thus suggest adding topologyInfo
inside Location as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in Location
to specify these methods violates the abstraction. I know this convenient and avoids casting but still we should avoid meaningless methods. In a disaggregated storage solution the executorId
has no value and probably host and port is not enough and never needed as separate entities but an URL like construct will be more useful (or something else but it is the responsibility of the specific subclass).
Kubernetes integration test starting |
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the PR!
@@ -28,16 +28,23 @@ import org.apache.spark.internal.config | |||
import org.apache.spark.storage.BlockManagerId | |||
import org.apache.spark.util.Utils | |||
|
|||
trait Location extends Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original BlockManagerId extends Externalizable
, I think Externalizable
here for Location
is ok.
def host: String | ||
def port: Int | ||
def hostPort: String | ||
def executorId: String = "unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, adding this here helps for this initial version. Could we add something like storageInfo: Option[Serializable]
, which could be used to store extra information for different disaggregated shuffle solutions? e.g. storageInfo could be remote storage file path(s) or remote shuffle server(s).
@Ngone51 thanks for doing this. I am currently busy with some other stuffs but next week I will do the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass through it. Thx for working on it @Ngone51 !
@@ -28,16 +28,23 @@ import org.apache.spark.internal.config | |||
import org.apache.spark.storage.BlockManagerId | |||
import org.apache.spark.util.Utils | |||
|
|||
trait Location extends Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have missed some context here, but why Externalizable
and not Serializable
?
def host: String | ||
def port: Int | ||
def hostPort: String | ||
def executorId: String = "unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on discussion, if we are going to mirror everything in BlockManagerId
, why not expose that directly with ability to subclass it and avoid Location
altogether ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first round
@@ -28,16 +28,23 @@ import org.apache.spark.internal.config | |||
import org.apache.spark.storage.BlockManagerId | |||
import org.apache.spark.util.Utils | |||
|
|||
trait Location extends Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UsingExternalizable
here is totally fine.
def host: String | ||
def port: Int | ||
def hostPort: String | ||
def executorId: String = "unknown" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in Location
to specify these methods violates the abstraction. I know this convenient and avoids casting but still we should avoid meaningless methods. In a disaggregated storage solution the executorId
has no value and probably host and port is not enough and never needed as separate entities but an URL like construct will be more useful (or something else but it is the responsibility of the specific subclass).
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #136856 has finished for PR 31876 at commit
|
(Sorry for the delay, was busy with internal stuff..) So I have removed all the methods from the interface a) ShuffleBlockFetcherIterator Castings here should be extracted to a Spark native shuffle reader, so it should be fine. b) DAGScheduler/MapOutputTrakcer
c) TaskSetManager d) JsonProtocol For cases b,c,d, I'll try to get rid of the casting in later commits. One feasible way is to use the pattern match to skip other Locations. At the same time, I'm still thinking if there would be a better way to unify the behavior of locations. e.g., for storage like HDFS, which doesn't have a specific host, we could probably use "*" to represent it. And for |
Thanks for the change! It might be tricky to deal with casting in b,c,d and skip other location types. Echoing on previous suggestion to expose |
According to #31876 (comment) , #31876 (comment), I think we're less prefer to the way of exposing |
Sorry about the confusion: I tried to remove my requested changes flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took an initial pass.
@@ -81,7 +81,7 @@ case object Resubmitted extends TaskFailedReason { | |||
*/ | |||
@DeveloperApi | |||
case class FetchFailed( | |||
bmAddress: BlockManagerId, // Note that bmAddress can be null | |||
bmAddress: Location, // Note that bmAddress can be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a backwardly incompatible change, which also impacts event files.
Unfortunately it looks like most folks who worked on this in past are no longer active.
+CC @tgravescs, @dongjoon-hyun for additional review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the event files, we still log the json of BlockManagerId
. So I think it's still compatible, no?
Besides, do you worry about the possible backward incompatibility due to the reference out of Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two changes here: a) change to public api (programmatic, REST api, etc), b) change to generated/persisted data.
Both need to be addressed.
Specifically about (b), with bmAddress
changing to Location
- since it need not be a BlockManagerId
anymore - json serde needs to account for it. Currently, it is a TODO here.
Spark history server, REST consumers, other apps depending on event files - will need a way to identify Location type and serde all valid Location types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a) change to public api (programmatic, REST api, etc)
Technically, it's really a problem. Although, I can't imagine how users would use it as an API.
And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed
. Thus, we the location is BlockManagerId
then we use FetchFailed
, otherwise, uses CustomStorageFetchFailed
. WDYT?
b) change to generated/persisted data.
I think we won't change the data of BlockManagerId
here. If we find the Location
is a BlockManagerId
, we'd still output as ("Block Manager Address" -> blockManagerAddress)
. So, IIUC, it won't cause problems.
The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson)
. So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed. Thus, we the location is BlockManagerId then we use FetchFailed, otherwise, uses CustomStorageFetchFailed. WDYT?
CustomStorageFetchFailed
looks like a promising approach, we will need to think through what the implications of it would be would on the face of it, it should address immediate concerns IMO.
Thoughts @attilapiros, @tgravescs ?
The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson). So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.
There are couple of issues here:
- A simpler question of how to handle custom location - from programmatic and data point of view.
- How to handle different shuffle impls being in play for the same event directory.
- If deployments have multiple shuffle infra in use over course of time (or different clusters with different configs and a shared history event dir), each with their own Location's.
- How will SHS/REST api, etc understand which location class is being used/how to parse them.
I actually dont have good solutions on this - other than adding some metadata per location record to indicate the 'type'.
Any other thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for a new class without introducing a breaking change. Thank you for pinging me, @mridulm .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'll have to add interfaces to Location
in order to support json serde , e.g.,
public JValue serializeToJson();
public Location deserilaizeFromJson(json: JValue);
How to handle different shuffle impls being in play for the same event directory.
Adding metadata is good idea, we can have the format like,
"Mapstatus Location": {
"type": "xx.yy.zz", // qualified class name
"content": { // content is generate by `Location.serializeToJson`
"aaa":"bbb"
}
}
with the constant format, end-users and SHS are able to parse them as well.
(I had an idea about SHS is to add the location type as the extension of the event log file. That's the way what compression does now. But I think it doesn't solve the problem of REST case.)
BTW, I have added https://issues.apache.org/jira/browse/SPARK-35188 for this support.
conf | ||
).head | ||
loc.readExternal(in) | ||
loc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to explore a LocationFactory
in addition to Location
- which is used here ?
loadExtensions
is not cheap, and doing this for every instance creation is going to be very expensive.
A factory will also allow for implementations to cache Location
- else we will have GC issues at driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may not understand well about the usage of LocationFactory
here. But, the problem is, we need a fresh Location
instance here every time and then make it to a specific one by readExternal(...)
. So, I'm not sure what do we want to cache here? Or, you mean we can cache the reflected class
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a fresh Location instance ?
To answer the query, something like SparkEnv.get.getShuffleLocationFactory.deserialize(in)
(or some such - which defaults to using BlockManagerId) should make it a method dispatch - with ability for LocationFactory to cache the (immutable) Location object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "fresh Location instance", I mean, we always need to create the instance using 0-arg constructor first (this's the "fresh" one) and then make it to a specific one by "readExternal".
Follow your idea, I think we cache two things in LocationFactory
:
-
cache the loaded class of the custom location so we don't need to load it every time
-
cache the location instances so we don't keep the references of multiple instances for the same location
If so, I think we can do it by ourselves instead of exposing LocationFactory
to users.
Does it sound good to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a prototype at 66977e0. Does it look good to you? (If yes, I'd like to optimize it further to unify the locationCache
and blockManagerIdCache
if possible and reduce some duplicate code...)
Yeah, in this case, we need to deal with those |
Really sorry about the delay here :(. I'll try to address all the comments tomorrow. |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #137811 has finished for PR 31876 at commit
|
@@ -129,21 +129,11 @@ private[spark] object BlockManagerId { | |||
def apply(in: ObjectInput): BlockManagerId = { | |||
val obj = new BlockManagerId() | |||
obj.readExternal(in) | |||
getCachedBlockManagerId(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A reminder for myself: cache is still needed for the use case in UpdateBlockInfo
.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #137815 has finished for PR 31876 at commit
|
I have to circle back to this PR and review. |
@mridulm Sorry, missed your last comment... I think we can go ahead to update to the latest as long as we can put SPARK-35188 aside for now. |
I am fine with that ... we will need to solve it for analysis tooling based off of spark events to work (spark UI, REST api, etc). |
Sure. Let me try to update it when I get a chance. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
Hi @Ngone51! I am working on the adding of metadata to the map status to support 3rd party shuffle solutions better. My prototype available at attilapiros#4. I think it would nice to continue this PR so may I ask you to update this with the current master and reopen the PR? |
@Ngone51! Can I help you by refreshing this PR with the current master? |
* @since 3.2.0 | ||
*/ | ||
@Private | ||
public interface Location extends Externalizable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good if we consider only one Location for a map task. However, this is not addressing the concern with hybrid storage as originally proposed in this design doc, where few of the blocks written in local disk and few are written to HDFS or S3 etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically a single abstract Location
interface which can be implemented as you like can handle multiple real locations. IMHO it is just naming: Location => Locations and as mostly single real locations are represented I would even keep the current name.
* {@link ShuffleMapOutputWriter} after writing a shuffle data file and used by ShuffleMapOutputReader | ||
* to read the shuffle data. | ||
* | ||
* Since the location is returned by {@link ShuffleMapOutputWriter#commitAllPartitions()} at executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add the Location
in MapOutputCommitMessage
and slightly modify the ShuffleWriter
s to propagate Location to the driver. Could be a follow up PR? Or are there any other design choices to propagate the Location information to the driver (right now it is hardcoded to blockManager.blockManagerId).
What changes were proposed in this pull request?
This PR proposes (mentioned in #30763) to abstract the
location
(which is specified toBlockManagerId
now) inMapStatus
to a base classLocation
, in order to support custom storages, e.g., HDFS, S3. Custom storages can implement their ownLocation
s to provide some specific information. So custom shuffle readers can use their own location to fetch shuffle blocks.This PR tries to give a whole picture of the real impact to the default storage (
BlockManager
) with theabstraction
and let people discuss precisely on codes.Why are the changes needed?
Does this PR introduce any user-facing change?
How was this patch tested?