Skip to content

[SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage #31876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Mar 18, 2021

What changes were proposed in this pull request?

This PR proposes (mentioned in #30763) to abstract the location (which is specified to BlockManagerId now) in MapStatus to a base class Location, in order to support custom storages, e.g., HDFS, S3. Custom storages can implement their own Locations to provide some specific information. So custom shuffle readers can use their own location to fetch shuffle blocks.

This PR tries to give a whole picture of the real impact to the default storage (BlockManager) with the abstraction and let people discuss precisely on codes.

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

@github-actions github-actions bot added the CORE label Mar 18, 2021
@Ngone51
Copy link
Member Author

Ngone51 commented Mar 18, 2021

cc @attilapiros @hiboyang

Also cc @mridulm @tgravescs @jiangxb1987 for more inputs.

@@ -28,16 +28,23 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

trait Location extends Externalizable {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Externalizable would require the implementers to provide a zero-parameter constructor. Do you think it's acceptable or have any better idea? @hiboyang @attilapiros

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original BlockManagerId extends Externalizable, I think Externalizable here for Location is ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed some context here, but why Externalizable and not Serializable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the BlockManagerId already used Externalizable and we want to make the BlockManagerId as a default implementation of Location for Spark.

A possible way to let Location extends Serializable is to let BlockManagerId extends Serializable directly too. But we need to figure out whether serializers could handle topologyInfo_: Option[String] properly as topologyInfo_ now is handled manually in readExternal/writeExternal:

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
out.writeBoolean(topologyInfo_.isDefined)
// we only write topologyInfo if we have it
topologyInfo.foreach(out.writeUTF(_))
}
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
val isTopologyInfoAvailable = in.readBoolean()
topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UsingExternalizable here is totally fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: Externalizable takes precedence over Serializable.
We can continue to use Externalizable for BlockManagerId while not requiring other implementations to necessarily require it.

def host: String
def port: Int
def hostPort: String
def executorId: String = "unknown"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this only for the convenient purpose, doesn't mean I have any preference for the interface.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, adding this here helps for this initial version. Could we add something like storageInfo: Option[Serializable], which could be used to store extra information for different disaggregated shuffle solutions? e.g. storageInfo could be remote storage file path(s) or remote shuffle server(s).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "store extra information", I think implementors can add whatever they want only if they're serializable. e.g., extra info can be Option[Serializable] or Map[String, String].

But I did think about adding a common StorageType class to Location. For example, a valid use case is that we could know from it whether the storage is reliable (e.g., location.storageType.isReliable) so we can decide whether to apply "decommission" on such storage.

Copy link

@hiboyang hiboyang Mar 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could we add topologyInfo: Option[String] in Location, like other fields host/port/... which are from BlockManagerId?

location.storageType.isReliable is a good idea for "decommission".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on discussion, if we are going to mirror everything in BlockManagerId, why not expose that directly with ability to subclass it and avoid Location altogether ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it (exposing BlockManagerId) depends on the use cases. We could probably expose it if BlockManagerId is generally satisfied all the common use cases(e.g., BlockManager, hdfs, file server). That being said, I think the concept of BlockManagerId is not suitable for every case. For example, each executor would be corresponding to a BlockManagerId. But, in the case of hdfs, all the executors would only have one corresponding location indeed.

Adding Location would be more flexible for users if they have specific requirements that BlockManagerId can't satisfy. Besides, I think it's safer for Spark as BlockManagerId is not only used for shuffle phase but also widely used in RDD cache.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could we add topologyInfo: Option[String] in Location, like other fields host/port/... which are from BlockManagerId?

Do you know other cases that need it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uber Remote Shuffle Service uses topologyInfo to store shuffle server information. Since we kind of mirror things in BlockManagerId, thus suggest adding topologyInfo inside Location as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Location to specify these methods violates the abstraction. I know this convenient and avoids casting but still we should avoid meaningless methods. In a disaggregated storage solution the executorId has no value and probably host and port is not enough and never needed as separate entities but an URL like construct will be more useful (or something else but it is the responsibility of the specific subclass).

@SparkQA
Copy link

SparkQA commented Mar 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40770/

@SparkQA
Copy link

SparkQA commented Mar 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40770/

Copy link

@hiboyang hiboyang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the PR!

@@ -28,16 +28,23 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

trait Location extends Externalizable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original BlockManagerId extends Externalizable, I think Externalizable here for Location is ok.

def host: String
def port: Int
def hostPort: String
def executorId: String = "unknown"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, adding this here helps for this initial version. Could we add something like storageInfo: Option[Serializable], which could be used to store extra information for different disaggregated shuffle solutions? e.g. storageInfo could be remote storage file path(s) or remote shuffle server(s).

@attilapiros
Copy link
Contributor

@Ngone51 thanks for doing this. I am currently busy with some other stuffs but next week I will do the review.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took an initial pass through it. Thx for working on it @Ngone51 !

@@ -28,16 +28,23 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

trait Location extends Externalizable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed some context here, but why Externalizable and not Serializable ?

def host: String
def port: Int
def hostPort: String
def executorId: String = "unknown"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on discussion, if we are going to mirror everything in BlockManagerId, why not expose that directly with ability to subclass it and avoid Location altogether ?

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first round

@@ -28,16 +28,23 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

trait Location extends Externalizable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UsingExternalizable here is totally fine.

def host: String
def port: Int
def hostPort: String
def executorId: String = "unknown"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in Location to specify these methods violates the abstraction. I know this convenient and avoids casting but still we should avoid meaningless methods. In a disaggregated storage solution the executorId has no value and probably host and port is not enough and never needed as separate entities but an URL like construct will be more useful (or something else but it is the responsibility of the specific subclass).

@Ngone51 Ngone51 changed the title [WIP][SPARK-XXXX][API][CORE] Abstract Location in MapStatus to enable support for custom storage [SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage Apr 2, 2021
@SparkQA
Copy link

SparkQA commented Apr 2, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41434/

@SparkQA
Copy link

SparkQA commented Apr 2, 2021

Test build #136856 has finished for PR 31876 at commit 24f20c7.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • .doc(\"Qualified name of the class that used to initiate plugin location instance. \" +

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 2, 2021

(Sorry for the delay, was busy with internal stuff..)

So I have removed all the methods from the interface Location. And now, the casting to BlockMangerId happens in these 4 places:

a) ShuffleBlockFetcherIterator

Castings here should be extracted to a Spark native shuffle reader, so it should be fine.

b) DAGScheduler/MapOutputTrakcer

  • use the host or executorId from BlockManagerId to manage shuffle map outputs, e.g.,

removeOutputsOnHost(...)
removeOutputsOnExecutor(...)

  • use the host from BlockManagerId as the preferred location, e.g.,
    getPreferredLocationsForShuffle
    getMapLocation

c) TaskSetManager
Using both host and executorId to update the HealthyTracker

d) JsonProtocol
convert the BlockManagerId into a Json

For cases b,c,d, I'll try to get rid of the casting in later commits. One feasible way is to use the pattern match to skip other Locations. At the same time, I'm still thinking if there would be a better way to unify the behavior of locations. e.g., for storage like HDFS, which doesn't have a specific host, we could probably use "*" to represent it. And for executorId, although some storage doesn't have a meaningful value, each map task actually does have a corresponding executorId (but kind of agree that adding executorId would be confusing ).

@hiboyang
Copy link

hiboyang commented Apr 4, 2021

(Sorry for the delay, was busy with internal stuff..)

So I have removed all the methods from the interface Location. And now, the casting to BlockMangerId happens in these 4 places:

a) ShuffleBlockFetcherIterator

Castings here should be extracted to a Spark native shuffle reader, so it should be fine.

b) DAGScheduler/MapOutputTrakcer

  • use the host or executorId from BlockManagerId to manage shuffle map outputs, e.g.,

removeOutputsOnHost(...)
removeOutputsOnExecutor(...)

  • use the host from BlockManagerId as the preferred location, e.g.,
    getPreferredLocationsForShuffle
    getMapLocation

c) TaskSetManager
Using both host and executorId to update the HealthyTracker

d) JsonProtocol
convert the BlockManagerId into a Json

For cases b,c,d, I'll try to get rid of the casting in later commits. One feasible way is to use the pattern match to skip other Locations. At the same time, I'm still thinking if there would be a better way to unify the behavior of locations. e.g., for storage like HDFS, which doesn't have a specific host, we could probably use "*" to represent it. And for executorId, although some storage doesn't have a meaningful value, each map task actually does have a corresponding executorId (but kind of agree that adding executorId would be confusing ).

(Sorry for the delay, was busy with internal stuff..)

So I have removed all the methods from the interface Location. And now, the casting to BlockMangerId happens in these 4 places:

a) ShuffleBlockFetcherIterator

Castings here should be extracted to a Spark native shuffle reader, so it should be fine.

b) DAGScheduler/MapOutputTrakcer

  • use the host or executorId from BlockManagerId to manage shuffle map outputs, e.g.,

removeOutputsOnHost(...)
removeOutputsOnExecutor(...)

  • use the host from BlockManagerId as the preferred location, e.g.,
    getPreferredLocationsForShuffle
    getMapLocation

c) TaskSetManager
Using both host and executorId to update the HealthyTracker

d) JsonProtocol
convert the BlockManagerId into a Json

For cases b,c,d, I'll try to get rid of the casting in later commits. One feasible way is to use the pattern match to skip other Locations. At the same time, I'm still thinking if there would be a better way to unify the behavior of locations. e.g., for storage like HDFS, which doesn't have a specific host, we could probably use "*" to represent it. And for executorId, although some storage doesn't have a meaningful value, each map task actually does have a corresponding executorId (but kind of agree that adding executorId would be confusing ).

Thanks for the change! It might be tricky to deal with casting in b,c,d and skip other location types.

Echoing on previous suggestion to expose BlockManagerId and make it extensible, do we want to try that approach? We could rename BlockManagerId to some other generic name like BlockLocation.

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 6, 2021

Echoing on previous suggestion to expose BlockManagerId and make it extensible, do we want to try that approach? We could rename BlockManagerId to some other generic name like BlockLocation.

According to #31876 (comment) , #31876 (comment), I think we're less prefer to the way of exposing BlockManagerId.

attilapiros
attilapiros previously approved these changes Apr 6, 2021
@attilapiros attilapiros dismissed their stale review April 6, 2021 12:21

Sorry about the confusion: I tried to remove my requested changes flag.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took an initial pass.

@@ -81,7 +81,7 @@ case object Resubmitted extends TaskFailedReason {
*/
@DeveloperApi
case class FetchFailed(
bmAddress: BlockManagerId, // Note that bmAddress can be null
bmAddress: Location, // Note that bmAddress can be null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a backwardly incompatible change, which also impacts event files.
Unfortunately it looks like most folks who worked on this in past are no longer active.
+CC @tgravescs, @dongjoon-hyun for additional review.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the event files, we still log the json of BlockManagerId. So I think it's still compatible, no?

Besides, do you worry about the possible backward incompatibility due to the reference out of Spark?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two changes here: a) change to public api (programmatic, REST api, etc), b) change to generated/persisted data.
Both need to be addressed.

Specifically about (b), ​with bmAddress changing to Location - since it need not be a BlockManagerId anymore - json serde needs to account for it. Currently, it is a TODO here.

Spark history server, REST consumers, other apps depending on event files - will need a way to identify Location type and serde all valid Location types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a) change to public api (programmatic, REST api, etc)

Technically, it's really a problem. Although, I can't imagine how users would use it as an API.

And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed. Thus, we the location is BlockManagerId then we use FetchFailed, otherwise, uses CustomStorageFetchFailed. WDYT?

b) change to generated/persisted data.

I think we won't change the data of BlockManagerId here. If we find the Location is a BlockManagerId, we'd still output as ("Block Manager Address" -> blockManagerAddress). So, IIUC, it won't cause problems.

The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson). So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I have a new idea that we can introduce a new fetch failed class for the custom location and leave this one unchanged. For example, we can have CustomStorageFetchFailed. Thus, we the location is BlockManagerId then we use FetchFailed, otherwise, uses CustomStorageFetchFailed. WDYT?

CustomStorageFetchFailed looks like a promising approach, we will need to think through what the implications of it would be would on the face of it, it should address immediate concerns IMO.
Thoughts @attilapiros, @tgravescs ?

The only problem is the custom location. It's new data, e.g., ("XXXLocation" -> XXXLocationJson). So it can be a problem if users use the old version Spark to load event files. Although, I think this's really an unexpected usage.

There are couple of issues here:

  • A simpler question of how to handle custom location - from programmatic and data point of view.
  • How to handle different shuffle impls being in play for the same event directory.
    • If deployments have multiple shuffle infra in use over course of time (or different clusters with different configs and a shared history event dir), each with their own Location's.
    • How will SHS/REST api, etc understand which location class is being used/how to parse them.

I actually dont have good solutions on this - other than adding some metadata per location record to indicate the 'type'.
Any other thoughts ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for a new class without introducing a breaking change. Thank you for pinging me, @mridulm .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll have to add interfaces to Location in order to support json serde , e.g.,

public JValue serializeToJson();

public Location deserilaizeFromJson(json: JValue);

How to handle different shuffle impls being in play for the same event directory.

Adding metadata is good idea, we can have the format like,

"Mapstatus Location": {
  "type":  "xx.yy.zz", // qualified class name
  "content": { // content is generate by `Location.serializeToJson`
    "aaa":"bbb"
   }
}

with the constant format, end-users and SHS are able to parse them as well.

(I had an idea about SHS is to add the location type as the extension of the event log file. That's the way what compression does now. But I think it doesn't solve the problem of REST case.)

BTW, I have added https://issues.apache.org/jira/browse/SPARK-35188 for this support.

conf
).head
loc.readExternal(in)
loc
Copy link
Contributor

@mridulm mridulm Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to explore a LocationFactory in addition to Location - which is used here ?
loadExtensions is not cheap, and doing this for every instance creation is going to be very expensive.

A factory will also allow for implementations to cache Location - else we will have GC issues at driver.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not understand well about the usage of LocationFactory here. But, the problem is, we need a fresh Location instance here every time and then make it to a specific one by readExternal(...). So, I'm not sure what do we want to cache here? Or, you mean we can cache the reflected class?

Copy link
Contributor

@mridulm mridulm Apr 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a fresh Location instance ?

To answer the query, something like SparkEnv.get.getShuffleLocationFactory.deserialize(in) (or some such - which defaults to using BlockManagerId) should make it a method dispatch - with ability for LocationFactory to cache the (immutable) Location object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "fresh Location instance", I mean, we always need to create the instance using 0-arg constructor first (this's the "fresh" one) and then make it to a specific one by "readExternal".

Follow your idea, I think we cache two things in LocationFactory:

  1. cache the loaded class of the custom location so we don't need to load it every time

  2. cache the location instances so we don't keep the references of multiple instances for the same location

If so, I think we can do it by ourselves instead of exposing LocationFactory to users.

Does it sound good to you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a prototype at 66977e0. Does it look good to you? (If yes, I'd like to optimize it further to unify the locationCache and blockManagerIdCache if possible and reduce some duplicate code...)

@hiboyang
Copy link

hiboyang commented Apr 8, 2021

Echoing on previous suggestion to expose BlockManagerId and make it extensible, do we want to try that approach? We could rename BlockManagerId to some other generic name like BlockLocation.

According to #31876 (comment) , #31876 (comment), I think we're less prefer to the way of exposing BlockManagerId.

Yeah, in this case, we need to deal with those asInstanceOf[BlockManagerId] type cast to make it not fail for other location type. Any thoughts there?

@Ngone51
Copy link
Member Author

Ngone51 commented Apr 13, 2021

Really sorry about the delay here :(. I'll try to address all the comments tomorrow.

@SparkQA
Copy link

SparkQA commented Apr 19, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42164/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42341/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Test build #137811 has finished for PR 31876 at commit b6f7a12.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -129,21 +129,11 @@ private[spark] object BlockManagerId {
def apply(in: ObjectInput): BlockManagerId = {
val obj = new BlockManagerId()
obj.readExternal(in)
getCachedBlockManagerId(obj)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A reminder for myself: cache is still needed for the use case in UpdateBlockInfo.

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42345/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42345/

@SparkQA
Copy link

SparkQA commented Apr 22, 2021

Test build #137815 has finished for PR 31876 at commit 02b51bf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented May 5, 2021

I have to circle back to this PR and review.
@Ngone51 can we update to latest please ? Or are we waiting for some other changes to go in before doing it ?

@Ngone51
Copy link
Member Author

Ngone51 commented May 19, 2021

@mridulm Sorry, missed your last comment... I think we can go ahead to update to the latest as long as we can put SPARK-35188 aside for now.

@mridulm
Copy link
Contributor

mridulm commented May 19, 2021

I think we can go ahead to update to the latest as long as we can put SPARK-35188 aside for now.

I am fine with that ... we will need to solve it for analysis tooling based off of spark events to work (spark UI, REST api, etc).

@Ngone51
Copy link
Member Author

Ngone51 commented May 24, 2021

Sure. Let me try to update it when I get a chance.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@attilapiros
Copy link
Contributor

Hi @Ngone51!

I am working on the adding of metadata to the map status to support 3rd party shuffle solutions better. My prototype available at attilapiros#4.
But I think your PR (this one) can be used to solve the same problem and this seems to be more elegant for the problem.
In my case the location is untouched and the metadata is an extra addition. As blocks was retrieved grouped by the location in my case that adds extra complexity (and redundant code) meanwhile t I think in real situations we do need the metadata to support custom block coordinates. And even when this is not the case a custom location can be used to store the extra info, too. So one customization in the MapStatus will be definitely enough. WDYT?

I think it would nice to continue this PR so may I ask you to update this with the current master and reopen the PR?
I would be happy to help by reviewing it.

@attilapiros
Copy link
Contributor

@Ngone51! Can I help you by refreshing this PR with the current master?

* @since 3.2.0
*/
@Private
public interface Location extends Externalizable {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good if we consider only one Location for a map task. However, this is not addressing the concern with hybrid storage as originally proposed in this design doc, where few of the blocks written in local disk and few are written to HDFS or S3 etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically a single abstract Location interface which can be implemented as you like can handle multiple real locations. IMHO it is just naming: Location => Locations and as mostly single real locations are represented I would even keep the current name.

* {@link ShuffleMapOutputWriter} after writing a shuffle data file and used by ShuffleMapOutputReader
* to read the shuffle data.
*
* Since the location is returned by {@link ShuffleMapOutputWriter#commitAllPartitions()} at executor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add the Location in MapOutputCommitMessage and slightly modify the ShuffleWriters to propagate Location to the driver. Could be a follow up PR? Or are there any other design choices to propagate the Location information to the driver (right now it is hardcoded to blockManager.blockManagerId).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants