Skip to content

Commit

Permalink
[apache#1024] improvement(tez): Optimize user switch to shuffle mode …
Browse files Browse the repository at this point in the history
…local/remote. (apache#1397)

### What changes were proposed in this pull request?

Currently, users need to set the parameter tez.am.launch.launch.cmd-opts to switch between local shuffle and remote shuffle, but this parameter is difficult for users to understand.
We need a simpler and easier to understand parameter,such as tez.shuffle.mode = local/remote
### Why are the changes needed?

> [Improvement] [tez]:Optimize user switch to shuffle mode local/remote. apache#1024


Fix:  apache#1024

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

unit test or cluster prd test
  • Loading branch information
lifeSo authored Jan 3, 2024
1 parent 114ec52 commit bd98c44
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public class RssTezConfig {
public static final int RSS_REDUCE_REMOTE_SPILL_RETRIES_DEFAULT = 5;
public static final String RSS_REMOTE_SPILL_STORAGE_PATH =
TEZ_RSS_CONFIG_PREFIX + "rss.remote.spill.storage.path";
public static final String RSS_SHUFFLE_MODE = TEZ_RSS_CONFIG_PREFIX + "shuffle.mode";
public static final String DEFAULT_RSS_SHUFFLE_MODE = "remote";

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,15 @@ public static void main(String[] args) {
try {
// We use trick way to introduce RssDAGAppMaster by the config tez.am.launch.cmd-opts.
// It means some property which is set by command line will be ingored, so we must reload it.
Configuration conf = new Configuration(new YarnConfiguration());
DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(
System.getenv(ApplicationConstants.Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());

boolean sessionModeCliOption = false;
boolean rollBackToLocalShuffle = false;
String[] rollBackRemainingArgs = null;
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("-D")) {
String[] property = args[i].split("=");
Expand All @@ -337,10 +345,24 @@ public static void main(String[] args) {
} else if (args[i].contains("--session") || args[i].contains("-s")) {
sessionModeCliOption = true;
}
if (args[i].contains(DAGAppMaster.class.getName()) && isLocalShuffleMode(conf)) {
rollBackToLocalShuffle = true;
rollBackRemainingArgs = Arrays.copyOfRange(args, i + 1, args.length);
}
}

// Load the log4j config is only init in static code block of LogManager, so we must
// reconfigure.
reconfigureLog4j();
// if set tez.shuffle.mode = local then degenerates to the native way.
if (rollBackToLocalShuffle) {
// rollback to local shuffle mode.
LOG.info(
"Rollback to local shuffle mode, since tez.shuffle.mode = {}",
conf.get(RssTezConfig.RSS_SHUFFLE_MODE, RssTezConfig.DEFAULT_RSS_SHUFFLE_MODE));
DAGAppMaster.main(rollBackRemainingArgs);
return;
}

// Install the tez class loader, which can be used add new resources
TezClassLoader.setupTezClassLoader();
Expand Down Expand Up @@ -382,13 +404,6 @@ public static void main(String[] args) {
+ ", logDirs="
+ System.getenv(ApplicationConstants.Environment.LOG_DIRS.name()));

Configuration conf = new Configuration(new YarnConfiguration());

DAGProtos.ConfigurationProto confProto =
TezUtilsInternal.readUserSpecifiedTezConfiguration(
System.getenv(ApplicationConstants.Environment.PWD.name()));
TezUtilsInternal.addUserSpecifiedTezConfiguration(conf, confProto.getConfKeyValuesList());

AMPluginDescriptorProto amPluginDescriptorProto = null;
if (confProto.hasAmPluginDescriptor()) {
amPluginDescriptorProto = confProto.getAmPluginDescriptor();
Expand Down Expand Up @@ -453,6 +468,20 @@ public static void main(String[] args) {
}
}

private static boolean isLocalShuffleMode(Configuration conf) {
String shuffleMode =
conf.get(RssTezConfig.RSS_SHUFFLE_MODE, RssTezConfig.DEFAULT_RSS_SHUFFLE_MODE);
switch (shuffleMode) {
case "remote":
return false;
case "local":
return true;
default:
throw new RssException(
"Unsupported shuffle mode" + shuffleMode + ", ensure that it is set to local/remote.");
}
}

static void mayCloseTezSlowStart(Configuration conf) {
if (!conf.getBoolean(
RssTezConfig.RSS_AM_SLOW_START_ENABLE, RssTezConfig.RSS_AM_SLOW_START_ENABLE_DEFAULT)) {
Expand Down
1 change: 1 addition & 0 deletions docs/client_guide/tez_client_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Note that the RssDAGAppMaster will automatically disable slow start (i.e., `tez.
| tez.rss.client.max.buffer.size | 3k | The max buffer size in map side. Control the size of each segment(WrappedBuffer) in the buffer. |
| tez.rss.client.batch.trigger.num | 50 | The max batch of buffers to send data in map side. Affect the number of blocks sent to the server in each batch, and may affect rss_worker_used_buffer_size |
| tez.rss.client.send.thread.num | 5 | The thread pool size for the client to send data to the server. |
| tez.shuffle.mode | remote | Use Remote Shuffle if the value is set to 'remote' or use default config value, or set 'local' to use local shuffle when needs to fall back. |


### Remote Spill (Experimental)
Expand Down

0 comments on commit bd98c44

Please sign in to comment.