Skip to content

Commit

Permalink
[CELEBORN-1648] Refine AppUniqueId with UUID suffix
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

We can add randomUUID as an suffix to solve it

### Why are the changes needed?

currently, we cannot guarantee application id is really unique. this may lead to data issue.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
test locally

Closes apache#2810 from chenkovsky/feature/uuid_appid.

Authored-by: Chongchen Chen <chenkovsky@qq.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
  • Loading branch information
chenkovsky authored and RexXiong committed Oct 17, 2024
1 parent fdff494 commit eb49ed7
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ public MRAppMasterWithCeleborn(
int numReducers = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (numReducers > 0) {
CelebornConf conf = HadoopUtils.fromYarnConf(jobConf);
LifecycleManager lifecycleManager =
new LifecycleManager(applicationAttemptId.toString(), conf);
String appUniqueId = conf.appUniqueIdWithUUIDSuffix(applicationAttemptId.toString());
LifecycleManager lifecycleManager = new LifecycleManager(appUniqueId, conf);
String lmHost = lifecycleManager.getHost();
int lmPort = lifecycleManager.getPort();
logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, applicationAttemptId);
logger.info("MRAppMaster initialized with {} {} {}", lmHost, lmPort, appUniqueId);
JobConf lmConf = new JobConf();
lmConf.clear();
lmConf.set(HadoopUtils.MR_CELEBORN_LM_HOST, lmHost);
lmConf.set(HadoopUtils.MR_CELEBORN_LM_PORT, lmPort + "");
lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, applicationAttemptId.toString());
lmConf.set(HadoopUtils.MR_CELEBORN_APPLICATION_ID, appUniqueId);
writeLifecycleManagerConfToTask(jobConf, lmConf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private void initializeLifecycleManager(String appId) {
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
lifecycleManager = new LifecycleManager(appId, celebornConf);
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
(MapOutputTrackerMaster) SparkEnv.get().mapOutputTracker();
Expand All @@ -113,8 +114,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appUniqueId);
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,15 @@ private SortShuffleManager sortShuffleManager() {
return _sortShuffleManager;
}

private void initializeLifecycleManager() {
private void initializeLifecycleManager(String appId) {
// Only create LifecycleManager singleton in Driver. When register shuffle multiple times, we
// need to ensure that LifecycleManager will only be created once. Parallelism needs to be
// considered in this place, because if there is one RDD that depends on multiple RDDs
// at the same time, it may bring parallel `register shuffle`, such as Join in Sql.
if (isDriver && lifecycleManager == null) {
synchronized (this) {
if (lifecycleManager == null) {
appUniqueId = celebornConf.appUniqueIdWithUUIDSuffix(appId);
lifecycleManager = new LifecycleManager(appUniqueId, celebornConf);
if (celebornConf.clientFetchThrowsFetchFailure()) {
MapOutputTrackerMaster mapOutputTracker =
Expand All @@ -156,8 +157,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
// Note: generate app unique id at driver side, make sure dependency.rdd.context
// is the same SparkContext among different shuffleIds.
// This method may be called many times.
appUniqueId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager();
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.registerAppShuffleDeterminate(
shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.celeborn.common

import java.io.{File, IOException}
import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap}
import java.util.{Collection => JCollection, Collections, HashMap => JHashMap, Locale, Map => JMap, UUID}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -907,6 +907,15 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientExcludeReplicaOnFailureEnabled: Boolean =
get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)

def appUniqueIdWithUUIDSuffix(appId: String): String = {
if (clientApplicationUUIDSuffixEnabled) {
appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
} else {
appId
}
}

// //////////////////////////////////////////////////////
// Shuffle Compression //
Expand Down Expand Up @@ -5065,6 +5074,14 @@ object CelebornConf extends Logging {
.checkValue(v => v > 0.0 && v <= 1.0, "Value must be between 0 and 1 (inclusive)")
.createWithDefault(0.4)

val CLIENT_APPLICATION_UUID_SUFFIX_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.application.uuidSuffix.enabled")
.categories("client")
.version("0.6.0")
.doc("Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR.")
.booleanConf
.createWithDefault(false)

val TEST_ALTERNATIVE: OptionalConfigEntry[String] =
buildConf("celeborn.test.alternative.key")
.withAlternative("celeborn.test.alternative.deprecatedKey")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ license: |
| --- | ------- | --------- | ----------- | ----- | ---------- |
| celeborn.client.application.heartbeatInterval | 10s | false | Interval for client to send heartbeat message to master. | 0.3.0 | celeborn.application.heartbeatInterval |
| celeborn.client.application.unregister.enabled | true | false | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | |
| celeborn.client.application.uuidSuffix.enabled | false | false | Whether to add UUID suffix for application id for unique. When `true`, add UUID suffix for unique application id. Currently, this only applies to Spark and MR. | 0.6.0 | |
| celeborn.client.chunk.prefetch.enabled | false | false | Whether to enable chunk prefetch when creating CelebornInputStream. | 0.6.0 | |
| celeborn.client.closeIdleConnections | true | false | Whether client will close idle connections. | 0.3.0 | |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | |
Expand Down

0 comments on commit eb49ed7

Please sign in to comment.