Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
614171b
code optimization
v-kkhuang Dec 29, 2025
fcaa94b
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Dec 31, 2025
5101c90
code optimization
v-kkhuang Dec 31, 2025
348b683
code optimization
v-kkhuang Jan 4, 2026
c68899a
code optimization
v-kkhuang Jan 5, 2026
eec8115
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 7, 2026
8709051
code optimization
v-kkhuang Jan 7, 2026
374d8d6
code optimization
v-kkhuang Jan 7, 2026
cbac517
code optimization
v-kkhuang Jan 7, 2026
91a62e6
code optimization
v-kkhuang Jan 7, 2026
8564790
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 8, 2026
9a8b833
code optimization
v-kkhuang Jan 8, 2026
352a515
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 9, 2026
bbd2164
code optimization
v-kkhuang Jan 9, 2026
a15d800
code optimization
v-kkhuang Jan 9, 2026
a4f8b91
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 13, 2026
07bcf9b
code optimization
v-kkhuang Jan 13, 2026
0094510
code optimization
v-kkhuang Jan 13, 2026
3966040
code optimization
v-kkhuang Jan 13, 2026
d82a481
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 15, 2026
f2db79f
code optimization
v-kkhuang Jan 15, 2026
b18a19e
Merge branch 'dev-1.18.0-webank' into dev-1.18.0-code-review-fix
v-kkhuang Jan 20, 2026
ee90663
code optimization
v-kkhuang Jan 20, 2026
db4a871
code optimization
v-kkhuang Jan 20, 2026
ff526c7
code optimization
v-kkhuang Jan 20, 2026
fc2254b
code optimization
v-kkhuang Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ object Configuration extends Logging {
val METRICS_INCREMENTAL_UPDATE_ENABLE =
CommonVars[Boolean]("linkis.jobhistory.metrics.incremental.update.enable", false)

val EXECUTE_ERROR_CODE_INDEX =
CommonVars("execute.error.code.index", "-1")

val EXECUTE_RESULTSET_ALIAS_NUM =
CommonVars("execute.resultset.alias.num", "0")

val GLOBAL_CONF_CHN_NAME = "全局设置"

val GLOBAL_CONF_CHN_OLDNAME = "通用设置"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.linkis.engineconn.computation.executor.execute

import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
Expand Down Expand Up @@ -262,10 +263,13 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
var executeFlag = true
val errorIndex: Int = Integer.valueOf(
engineConnTask.getProperties.getOrDefault("execute.error.code.index", "-1").toString
engineConnTask.getProperties
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
.toString
)
engineExecutionContext.getProperties.put("execute.error.code.index", errorIndex.toString)
// 如果执行失败,则将错误的index-1,因为在重试的时候,会将错误的index+1,所以需要-1,
engineExecutionContext.getProperties
.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, errorIndex.toString)
// jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的
var newIndex = index
var newErrorIndex = errorIndex
if (adjustErrorIndexForSetScenarios(engineConnTask)) {
Expand Down Expand Up @@ -322,7 +326,8 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
s"task execute failed, with index: ${index} retryNum: ${retryNum}, and will retry"
)
)
engineConnTask.getProperties.put("execute.error.code.index", index.toString)
engineConnTask.getProperties
.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, index.toString)
return ErrorRetryExecuteResponse(e.message, index, e.t)
} else {
failedTasks.increase()
Expand All @@ -341,7 +346,10 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
engineExecutionContext.appendStdout(output)
if (StringUtils.isNotBlank(e.getOutput)) {
engineConnTask.getProperties
.put("execute.resultset.alias.num", engineExecutionContext.getAliasNum.toString)
.put(
Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key,
engineExecutionContext.getAliasNum.toString
)
engineExecutionContext.sendResultSet(e)
}
case _: IncompleteExecuteResponse =>
Expand Down Expand Up @@ -403,7 +411,9 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
)

val currentAliasNum = Integer.valueOf(
engineConnTask.getProperties.getOrDefault("execute.resultset.alias.num", "0").toString
engineConnTask.getProperties
.getOrDefault(Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key, "0")
.toString
)

ComputationEngineUtils.sendToEntrance(
Expand Down Expand Up @@ -491,11 +501,15 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
engineExecutionContext.setLabels(engineConnTask.getLables)

val errorIndex: Int = Integer.valueOf(
engineConnTask.getProperties.getOrDefault("execute.error.code.index", "-1").toString
engineConnTask.getProperties
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
.toString
)
if (errorIndex > 0) {
val savedAliasNum = Integer.valueOf(
engineConnTask.getProperties.getOrDefault("execute.resultset.alias.num", "0").toString
engineConnTask.getProperties
.getOrDefault(Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key, "0")
.toString
)
engineExecutionContext.setResultSetNum(savedAliasNum)
logger.info(s"Restore aliasNum to $savedAliasNum for retry task")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException {
String resultSetPathRoot = GovernanceCommonConf.RESULT_SET_STORE_PATH().getValue(runtimeMapTmp);

if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) {
// 修复:任务重试背景下,10:59分提交任务执行,重试时时间变成11:00,重试任务会重新生成结果目录,导致查询结果集时,重试之前执行的结果集丢失
// 新增判断:生成结果目录之前,判断任务之前是否生成结果集,生成过就复用
if (org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) {
resultSetPathRoot = jobRequest.getResultLocation();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,19 +322,26 @@ abstract class EntranceServer extends Logging {
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable() {
override def run(): Unit = {
val undoneTask = getAllUndoneTask(null, null)
// 新增任务诊断检测逻辑
if (EntranceConfiguration.TASK_DIAGNOSIS_ENABLE) {
logger.info("Start to check tasks for diagnosis")
val undoneTask = getAllUndoneTask(null, null)
val diagnosisTime = System.currentTimeMillis() - new TimeType(
EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT
).toLong
undoneTask
.filter { job =>
val engineType = LabelUtil.getEngineType(job.getJobRequest.getLabels)
val jobMetrics = Option(job.jobRequest.getMetrics)
val startTime =
if (jobMetrics.exists(_.containsKey(TaskConstant.JOB_RUNNING_TIME))) {
jobMetrics.get.get(TaskConstant.JOB_RUNNING_TIME).toString.toLong
} else {
0L
}
engineType.contains(
EntranceConfiguration.TASK_DIAGNOSIS_ENGINE_TYPE
) && job.createTime < diagnosisTime && !diagnosedJobs.containsKey(
) && startTime != 0 && startTime < diagnosisTime && !diagnosedJobs.containsKey(
job.getJobRequest.getId.toString
)
}
Expand Down Expand Up @@ -390,17 +397,17 @@ abstract class EntranceServer extends Logging {
}
logger.info("Finished to check Spark tasks for diagnosis")
}
}
// 定期清理diagnosedJobs,只保留未完成任务的记录
val undoneJobIds = undoneTask.map(_.getJobRequest.getId.toString()).toSet
val iterator = diagnosedJobs.keySet().iterator()
while (iterator.hasNext) {
val jobId = iterator.next()
if (!undoneJobIds.contains(jobId)) {
iterator.remove()
// 定期清理diagnosedJobs,只保留未完成任务的记录
val undoneJobIds = undoneTask.map(_.getJobRequest.getId.toString()).toSet
val iterator = diagnosedJobs.keySet().iterator()
while (iterator.hasNext) {
val jobId = iterator.next()
if (!undoneJobIds.contains(jobId)) {
iterator.remove()
}
}
logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}")
}
logger.info(s"Cleaned diagnosedJobs cache, current size: ${diagnosedJobs.size()}")
}
},
new TimeType(EntranceConfiguration.TASK_DIAGNOSIS_TIMEOUT_SCAN).toLong,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ object EntranceConfiguration {
var SPARK_DYNAMIC_ALLOCATION_ENABLED =
CommonVars.apply("spark.dynamic.allocation.enabled", false).getValue


var SPARK_DYNAMIC_ALLOCATION_ADDITIONAL_CONFS =
CommonVars.apply("spark.dynamic.allocation.additional.confs", "").getValue

Expand Down Expand Up @@ -450,6 +451,6 @@ object EntranceConfiguration {
val TASK_DIAGNOSIS_TIMEOUT = CommonVars[String]("linkis.task.diagnosis.timeout", "5m").getValue

val TASK_DIAGNOSIS_TIMEOUT_SCAN =
CommonVars("linkis.task.diagnosis.timeout.scan", "1m").getValue
CommonVars("linkis.task.diagnosis.timeout.scan", "2m").getValue

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.entrance.execute

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
Expand Down Expand Up @@ -252,8 +253,8 @@ class DefaultEntranceExecutor(id: Long)
if (rte.errorIndex >= 0) {
logger.info(s"tasks execute error with error index: ${rte.errorIndex}")
val newParams: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
newParams.put("execute.error.code.index", rte.errorIndex.toString)
newParams.put("execute.resultset.alias.num", rte.aliasNum.toString)
newParams.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, rte.errorIndex.toString)
newParams.put(Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key, rte.aliasNum.toString)
LogUtils.generateInfo(
s"tasks execute error with error index: ${rte.errorIndex} and will retry."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,17 @@ class TaskRetryInterceptor extends EntranceInterceptor with Logging {
// 全局重试开关开启时处理
if (TASK_RETRY_SWITCH.getValue) {
val startMap: util.Map[String, AnyRef] = TaskUtils.getStartupMap(jobRequest.getParams)

// 分类型处理:AI SQL任务或配置支持的任务类型
if (LANGUAGE_TYPE_AI_SQL.equals(codeType)) {
// AI SQL任务需同时满足功能启用和创建者权限
if (aiSqlEnable && supportAISQLCreator.contains(creator.toLowerCase())) {
logAppender.append(
LogUtils.generateWarn(s"The AI SQL task will initiate a failed retry \n")
)
startMap.put(TASK_RETRY_SWITCH.key, TASK_RETRY_SWITCH.getValue.asInstanceOf[AnyRef])
}
} else {
TASK_RETRY_CODE_TYPE
.split(",")
.foreach(codeTypeConf => {
if (codeTypeConf.equals(codeType)) {
// 普通任务只需满足类型支持
logAppender.append(
LogUtils.generateWarn(s"The StarRocks task will initiate a failed retry \n")
)
startMap.put(TASK_RETRY_SWITCH.key, TASK_RETRY_SWITCH.getValue.asInstanceOf[AnyRef])
startMap.put(RETRY_NUM_KEY.key, RETRY_NUM_KEY.getValue.asInstanceOf[AnyRef])
}
})
}
TASK_RETRY_CODE_TYPE
.split(",")
.foreach(codeTypeConf => {
if (codeTypeConf.equals(codeType)) {
// 普通任务只需满足类型支持
logAppender
.append(LogUtils.generateWarn(s"The StarRocks task will initiate a failed retry \n"))
startMap.put(TASK_RETRY_SWITCH.key, TASK_RETRY_SWITCH.getValue.asInstanceOf[AnyRef])
startMap.put(RETRY_NUM_KEY.key, RETRY_NUM_KEY.getValue.asInstanceOf[AnyRef])
}
})
// 更新作业参数
TaskUtils.addStartupMap(jobRequest.getParams, startMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,18 +249,23 @@ object EntranceUtils extends Logging {
LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue
)
try {
if (isSpark3 && sparkDynamicAllocationEnabled) {
logger.info(s"Task :${jobRequest.getId} using dynamic conf ")
// If dynamic allocation is disabled, only set python version
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
if (isSpark3) {
if (!sparkDynamicAllocationEnabled) {
logger.info(s"Task :${jobRequest.getId} using user dynamic conf ")
// If dynamic allocation is disabled, only set python version
properties.put(
EntranceConfiguration.SPARK3_PYTHON_VERSION.key,
EntranceConfiguration.SPARK3_PYTHON_VERSION.getValue
)
} else {
logger.info(s"Task :${jobRequest.getId} using default dynamic conf ")
setSparkDynamicAllocationDefaultConfs(properties, logAppender)
}
}
} catch {
case e: Exception =>
logger.error(
s"Task :${jobRequest.getId} using default dynamic conf, message {} ",
s"Task error :${jobRequest.getId} using default dynamic conf, message {} ",
e.getMessage
)
setSparkDynamicAllocationDefaultConfs(properties, logAppender)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,7 @@ class DefaultEngineCreateService
}
}
} { case e: Exception =>
logger.error(
s"Failed to update metrics for taskId: $taskId",
e
)
logger.error(s"Failed to update metrics for taskId: $taskId", e)
}
// 9. Add the Label of EngineConn, and add the Alias of engineConn
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
StringUtils.isNotBlank(templateName) && AMConfiguration.EC_REUSE_WITH_TEMPLATE_RULE_ENABLE
) {
engineScoreList = engineScoreList
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
.filter(engine => {
val oldTemplateName: String =
getValueByKeyFromProps(confTemplateNameKey, parseParamsToMap(engine.getParams))
Expand Down Expand Up @@ -276,7 +275,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe

// 过滤掉资源不满足的引擎
engineScoreList = engineScoreList
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
.filter(engine => {
val enginePythonVersion: String = getPythonVersion(parseParamsToMap(engine.getParams))
var pythonVersionMatch: Boolean = true
Expand Down Expand Up @@ -405,10 +403,7 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe

}
} { case e: Exception =>
logger.error(
s"Failed to update metrics for taskId: $taskId",
e
)
logger.error(s"Failed to update metrics for taskId: $taskId", e)
}
engine
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ object SparkConfiguration extends Logging {
val SPARK_ENGINE_EXTENSION_CONF =
CommonVars("linkis.spark.engine.extension.conf", "spark.sql.shuffle.partitions=200").getValue

val SPARK_PROHIBITS_DYNAMIC_RESOURCES_SWITCH =
CommonVars[Boolean]("linkis.spark.dynamic.resource.switch", false).getValue

private def getMainJarName(): String = {
val somePath = ClassUtils.jarOfClass(classOf[SparkEngineConnFactory])
if (somePath.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.engineplugin.spark.executor

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
Expand Down Expand Up @@ -136,7 +137,9 @@ object SQLSession extends Logging {

// 失败任务重试处理结果集
val errorIndex: Integer = Integer.valueOf(
engineExecutionContext.getProperties.getOrDefault("execute.error.code.index", "-1").toString
engineExecutionContext.getProperties
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
.toString
)
val hasSetResultSetNum: Boolean = engineExecutionContext.getProperties
.getOrDefault("hasSetResultSetNum", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.engineplugin.spark.executor

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{ByteTimeUtils, CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
Expand Down Expand Up @@ -203,7 +204,9 @@ abstract class SparkEngineConnExecutor(val sc: SparkContext, id: Long)

// print job configuration, only the first paragraph or retry
val errorIndex: Integer = Integer.valueOf(
engineExecutionContext.getProperties.getOrDefault("execute.error.code.index", "-1").toString
engineExecutionContext.getProperties
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
.toString
)
if (isFirstParagraph || (errorIndex + 1 == engineExecutorContext.getCurrentParagraph)) {
Utils.tryCatch({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,10 @@ class SparkEngineConnFactory extends MultiExecutorEngineConnFactory with Logging
// 在所有配置加载完成后检查Spark版本
// 如果不是3.4.4版本则关闭动态分配功能(这是最晚的配置设置点)
val sparkVersion = Utils.tryQuietly(EngineUtils.sparkSubmitVersion())
if (!LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue.equals(sparkVersion)) {
if (
SparkConfiguration.SPARK_PROHIBITS_DYNAMIC_RESOURCES_SWITCH && (!LabelCommonConfig.SPARK3_ENGINE_VERSION.getValue
.equals(sparkVersion))
) {
logger.info(
s"Spark version is $sparkVersion, not 3.4.4, disabling spark.dynamicAllocation.enabled"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.linkis.orchestrator.computation.service

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
import org.apache.linkis.governance.common.protocol.task._
Expand Down Expand Up @@ -97,8 +98,11 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
taskStatus match {
case rte: ResponseTaskStatusWithExecuteCodeIndex =>
logger.info(s"execute error with index: ${rte.errorIndex}")
task.updateParams("execute.error.code.index", rte.errorIndex.toString)
task.updateParams("execute.resultset.alias.num", rte.aliasNum.toString)
task.updateParams(Configuration.EXECUTE_ERROR_CODE_INDEX.key, rte.errorIndex.toString)
task.updateParams(
Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key,
rte.aliasNum.toString
)
case _ =>
}
// 标识当前方法执行过,该方法是异步的,处理失败任务需要该方法执行完
Expand Down
Loading
Loading