-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-34942][API][CORE] Abstract Location in MapStatus to enable support for custom storage #31876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
bd3f5c3
b040a14
1e01dc8
24f20c7
805e1f3
b01252a
eb10171
cb46210
451f963
0286260
66977e0
7ae8116
9bbcd73
2b7833a
9f33047
79d82e5
b010e94
5c0f7ff
e600fc5
701e73c
c3b49db
45b4679
7d411b2
f1d0817
69fb5ed
b6f7a12
02b51bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.api; | ||
|
||
import org.apache.spark.annotation.Private; | ||
|
||
/** | ||
* :: Private :: | ||
* A type of {@link Location} which based on the executor. | ||
* | ||
* @since 3.2.0 | ||
*/ | ||
@Private | ||
public interface ExecutorLocation extends HostLocation { | ||
String executorId(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.api; | ||
|
||
import org.apache.spark.annotation.Private; | ||
|
||
/** | ||
* :: Private :: | ||
* A type of {@link Location} which based on the host. | ||
* | ||
* @since 3.2.0 | ||
*/ | ||
@Private | ||
public interface HostLocation extends Location { | ||
String host(); | ||
|
||
int port(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.api; | ||
|
||
import org.apache.spark.annotation.Private; | ||
|
||
import java.io.Externalizable; | ||
import java.io.ObjectInput; | ||
import java.io.ObjectOutput; | ||
|
||
|
||
/** | ||
* :: Private :: | ||
* An interface for plugging in the location of shuffle files, in order to support store shuffle | ||
* data in different storage, e.g., BlockManager, HDFS, S3. It would be generated by | ||
* {@link ShuffleMapOutputWriter} after writing a shuffle data file and used by ShuffleMapOutputReader | ||
* to read the shuffle data. | ||
* | ||
* Since the location is returned by {@link ShuffleMapOutputWriter#commitAllPartitions()} at executor | ||
* and would be sent to driver, users must ensure the location is serializable by | ||
* | ||
* - implement a 0-arg constructor | ||
* - implement {@link java.io.Externalizable#readExternal(ObjectInput)} for deserialization | ||
* - implement {@link java.io.Externalizable#writeExternal(ObjectOutput)} for serialization | ||
* | ||
Ngone51 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Since the location will be used as keys in maps or comparing with others, users must ensure that | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit. This is annotated with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This follows the other shuffle APIs (e.g., And |
||
* invoking {@link java.lang.Object#equals(Object)} or {@link java.lang.Object#hashCode()} on the | ||
* {@link Location} instances would distinguish the different locations. | ||
* | ||
Ngone51 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Spark has its own default implementation of {@link Location} as | ||
* {@link org.apache.spark.storage.BlockManagerId}, which is a subclass of {@link ExecutorLocation} | ||
* since each {@link org.apache.spark.storage.BlockManager} must belong to a certain executor. | ||
* And {@link ExecutorLocation} is a subclass of {@link HostLocation} since each executor must | ||
* belong to a certain host. Users should choose the appropriate location interface according to their | ||
* own use cases. | ||
* | ||
* :: Caution :: | ||
* Spark would reuse the same location instance for locations which are equal due to the | ||
* performance concern. Thus, users should also guarantee the implemented {@link Location} | ||
* is IMMUTABLE. | ||
* | ||
* @since 3.2.0 | ||
*/ | ||
@Private | ||
public interface Location extends Externalizable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks good if we consider only one Location for a map task. However, this is not addressing the concern with hybrid storage as originally proposed in this design doc, where few of the blocks written in local disk and few are written to HDFS or S3 etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically a single abstract |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1217,6 +1217,24 @@ package object config { | |
.stringConf | ||
.createWithDefault(classOf[LocalDiskShuffleDataIO].getName) | ||
|
||
private[spark] val SHUFFLE_LOCATION_PLUGIN_CLASS = | ||
ConfigBuilder("spark.shuffle.sort.location.plugin.class") | ||
.doc("Qualified name of the class that used to initiate plugin location instance. " + | ||
"If not specified, Spark will use its native location (a.k.a BlockManagerId) by default.") | ||
.version("3.2.0") | ||
.stringConf | ||
.createOptional | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used the I tried the way of the default value ( |
||
|
||
private[spark] val SHUFFLE_LOCATION_CACHE_SIZE = | ||
ConfigBuilder("spark.shuffle.sort.location.cacheSize") | ||
.doc("The cache size for the location instances. Bigger size means that Spark will have " + | ||
"more chances to reuse the location instance for the same location but takes more memory.") | ||
.version("3.2.0") | ||
.intConf | ||
// In the case of `BlockManagerId`, which takes 48B for each, the total memory cost should | ||
// be below 1MB which is feasible. | ||
.createWithDefault(10000) | ||
|
||
private[spark] val SHUFFLE_FILE_BUFFER_SIZE = | ||
ConfigBuilder("spark.shuffle.file.buffer") | ||
.doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " + | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add the
Location
inMapOutputCommitMessage
and slightly modify theShuffleWriter
s to propagate Location to the driver. Could be a follow up PR? Or are there any other design choices to propagate the Location information to the driver (right now it is hardcoded to blockManager.blockManagerId).