Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ nohup.out
#claude
.claude

nul
nul
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ object EntranceConfiguration {

var SPARK3_PYTHON_VERSION = CommonVars.apply("spark.python.version", "python3");

var SPARK_DYNAMIC_ALLOCATION_ENABLED =
CommonVars.apply("spark.dynamic.allocation.enabled", false).getValue
var SPARK_DYNAMIC_CONF_USER_ENABLED =
CommonVars.apply("spark.dynamic.conf.user.enabled", false).getValue

var SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS =
CommonVars.apply("spark.dynamic.allocation.additional.confs", "").getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,58 +237,82 @@ object EntranceUtils extends Logging {
// deal with spark3 dynamic allocation conf
// 1.只有spark3需要处理动态规划参数 2.用户未指定模板名称,则设置默认值与spark底层配置保持一致,否则使用用户模板中指定的参数
val properties = new util.HashMap[String, AnyRef]()
val label: EngineTypeLabel = LabelUtil.getEngineTypeLabel(jobRequest.getLabels)
val sparkDynamicAllocationEnabled: Boolean =
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_ENABLED
if (
sparkDynamicAllocationEnabled && label.getEngineType.equals(
EngineType.SPARK.toString
) && label.getVersion.contains(LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue)
) {
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_CORES.key,
EntranceConfiguration.SPARK_EXECUTOR_CORES.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_MEMORY.key,
EntranceConfiguration.SPARK_EXECUTOR_MEMORY.getValue
)
properties.put(
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS.key,
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_INSTANCES.key,
EntranceConfiguration.SPARK_EXECUTOR_INSTANCES.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_MEMORY_OVERHEAD.key,
EntranceConfiguration.SPARK_EXECUTOR_MEMORY_OVERHEAD.getValue
)
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
Utils.tryAndWarn {
val extraConfs: String =
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS
if (StringUtils.isNotBlank(extraConfs)) {
val confs: Array[String] = extraConfs.split(",")
for (conf <- confs) {
val confKey: String = conf.split("=")(0)
val confValue: String = conf.split("=")(1)
properties.put(confKey, confValue)
}
val isSpark3 = LabelUtil.isTargetEngine(
jobRequest.getLabels,
EngineType.SPARK.toString,
LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue
)
try {
if (isSpark3) {
logger.info(s"Task :${jobRequest.getId} using dynamic conf ")
if (EntranceConfiguration.SPARK_DYNAMIC_CONF_USER_ENABLED) {
// If dynamic allocation is disabled, only set python version
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
} else {
setSparkDynamicAllocationDefaultConfs(properties, logAppender)
}
}
logAppender.append(
LogUtils
.generateInfo(s"use spark3 default conf. \n")
)
} catch {
case e: Exception =>
logger.error(
s"Task :${jobRequest.getId} using default dynamic conf, message {} ",
e.getMessage
)
setSparkDynamicAllocationDefaultConfs(properties, logAppender)
} finally {
TaskUtils.addStartupMap(params, properties)
}
}

/**
* Set spark dynamic allocation default confs
*/
private def setSparkDynamicAllocationDefaultConfs(
properties: util.HashMap[String, AnyRef],
logAppender: lang.StringBuilder
): Unit = {
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_CORES.key,
EntranceConfiguration.SPARK_EXECUTOR_CORES.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_MEMORY.key,
EntranceConfiguration.SPARK_EXECUTOR_MEMORY.getValue
)
properties.put(
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS.key,
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_INSTANCES.key,
EntranceConfiguration.SPARK_EXECUTOR_INSTANCES.getValue
)
properties.put(
EntranceConfiguration.SPARK_EXECUTOR_MEMORY_OVERHEAD.key,
EntranceConfiguration.SPARK_EXECUTOR_MEMORY_OVERHEAD.getValue
)
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
Utils.tryAndWarn {
val extraConfs: String =
EntranceConfiguration.SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS
if (StringUtils.isNotBlank(extraConfs)) {
val confs: Array[String] = extraConfs.split(",")
for (conf <- confs) {
val confKey: String = conf.split("=")(0)
val confValue: String = conf.split("=")(1)
properties.put(confKey, confValue)
}
}
}
logInfo(s"use spark3 default conf. \n", logAppender)
}

/**
* 敏感信息SQL检查
*/
Expand Down Expand Up @@ -482,7 +506,7 @@ object EntranceUtils extends Logging {
}

/**
* 记录日志信息
* 管理台任务日志info信息打印
*/
private def logInfo(message: String, logAppender: java.lang.StringBuilder): Unit = {
logAppender.append(LogUtils.generateInfo(s"$message\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,21 @@ object LabelUtil {
}
}

def isTargetEngine(
labels: util.List[Label[_]],
engine: String,
version: String = null
): Boolean = {
if (null == labels || StringUtils.isBlank(engine)) return false
val engineTypeLabel = getEngineTypeLabel(labels)
if (null != engineTypeLabel) {
val isEngineMatch = engineTypeLabel.getEngineType.equals(engine)
val isVersionMatch =
StringUtils.isBlank(version) || engineTypeLabel.getVersion.contains(version)
isEngineMatch && isVersionMatch
} else {
false
}
}

}
Loading