Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class EntranceUserParallelOrchestratorPlugin extends UserParallelOrchestratorPlu
private val configCache: LoadingCache[String, Integer] = CacheBuilder
.newBuilder()
.maximumSize(1000)
.expireAfterAccess(1, TimeUnit.HOURS)
.expireAfterAccess(EntranceConfiguration.USER_PARALLEL_REFLESH_TIME.getValue, TimeUnit.MINUTES)
.expireAfterWrite(EntranceConfiguration.USER_PARALLEL_REFLESH_TIME.getValue, TimeUnit.MINUTES)
.build(new CacheLoader[String, Integer]() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,18 @@ public class AMConfiguration {
public static final boolean AM_USER_RESET_RESOURCE =
CommonVars.apply("linkis.am.user.reset.resource.enable", true).getValue();

public static final CommonVars<Boolean> ENGINE_REUSE_ENABLE_CACHE =
CommonVars.apply("wds.linkis.manager.am.engine.reuse.enable.cache", false);

public static final CommonVars<TimeType> ENGINE_REUSE_CACHE_EXPIRE_TIME =
CommonVars.apply("wds.linkis.manager.am.engine.reuse.cache.expire.time", new TimeType("5s"));

public static final CommonVars<Long> ENGINE_REUSE_CACHE_MAX_SIZE =
CommonVars.apply("wds.linkis.manager.am.engine.reuse.cache.max.size", 1000L);

public static final CommonVars<String> ENGINE_REUSE_CACHE_SUPPORT_ENGINES =
CommonVars.apply("wds.linkis.manager.am.engine.reuse.cache.support.engines", "shell");

public static String getDefaultMultiEngineUser() {
String jvmUser = Utils.getJvmUser();
return String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ import org.apache.linkis.manager.am.selector.NodeSelector
import org.apache.linkis.manager.am.utils.AMUtils
import org.apache.linkis.manager.common.constant.AMConstant
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.common.entity.node.EngineNode
import org.apache.linkis.manager.common.entity.node.{EngineNode, ScoreServiceInstance}
import org.apache.linkis.manager.common.protocol.engine.{EngineReuseRequest, EngineStopRequest}
import org.apache.linkis.manager.common.utils.ManagerUtils
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import org.apache.linkis.manager.engineplugin.common.conf.EngineConnPluginConf.{
PYTHON_VERSION_KEY,
SPARK_PYTHON_VERSION_KEY
}
import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import org.apache.linkis.manager.label.entity.{EngineNodeLabel, Label}
import org.apache.linkis.manager.label.entity.engine.ReuseExclusionLabel
import org.apache.linkis.manager.label.entity.engine.{
EngineTypeLabel,
ReuseExclusionLabel,
UserCreatorLabel
}
import org.apache.linkis.manager.label.entity.node.AliasServiceInstanceLabel
import org.apache.linkis.manager.label.service.{NodeLabelService, UserLabelService}
import org.apache.linkis.manager.label.utils.{LabelUtil, LabelUtils}
Expand All @@ -58,6 +61,8 @@ import java.util.concurrent.{TimeoutException, TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

import com.google.common.cache.{Cache, CacheBuilder}

@Service
class DefaultEngineReuseService extends AbstractEngineService with EngineReuseService with Logging {

Expand Down Expand Up @@ -85,6 +90,26 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
@Autowired
private var nodeManagerPersistence: NodeManagerPersistence = _

private val instanceCache: Cache[String, util.Map[ScoreServiceInstance, util.List[Label[_]]]] =
CacheBuilder
.newBuilder()
.maximumSize(AMConfiguration.ENGINE_REUSE_CACHE_MAX_SIZE.getValue)
.expireAfterWrite(
AMConfiguration.ENGINE_REUSE_CACHE_EXPIRE_TIME.getValue.toLong,
TimeUnit.MILLISECONDS
)
.build()

private val engineNodesCache: Cache[String, Array[EngineNode]] =
CacheBuilder
.newBuilder()
.maximumSize(AMConfiguration.ENGINE_REUSE_CACHE_MAX_SIZE.getValue)
.expireAfterWrite(
AMConfiguration.ENGINE_REUSE_CACHE_EXPIRE_TIME.getValue.toLong,
TimeUnit.MILLISECONDS
)
.build()

/**
* 1. Obtain the EC corresponding to all labels 2. Judging reuse exclusion tags and fixed engine
* labels 3. Select the EC with the lowest load available 4. Lock the corresponding EC
Expand All @@ -99,11 +124,9 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
val taskId = JobUtils.getJobIdFromStringMap(engineReuseRequest.getProperties)
logger.info(s"Task $taskId Start to reuse Engine for request: $engineReuseRequest")
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
val labels: util.List[Label[_]] = labelBuilderFactory.getLabels(engineReuseRequest.getLabels)
val labelList = LabelUtils
.distinctLabel(
labelBuilderFactory.getLabels(engineReuseRequest.getLabels),
userLabelService.getUserLabels(engineReuseRequest.getUser)
)
.distinctLabel(labels, userLabelService.getUserLabels(engineReuseRequest.getUser))
.asScala

val exclusionInstances: Array[String] =
Expand Down Expand Up @@ -138,7 +161,23 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
}
}

val instances = nodeLabelService.getScoredNodeMapsByLabels(filterLabelList)
val userCreatorLabel: UserCreatorLabel = LabelUtil.getUserCreatorLabel(labels)
val engineTypeLabel: EngineTypeLabel = LabelUtil.getEngineTypeLabel(labels)
val cacheKey: String = userCreatorLabel.getStringValue + "_" + engineTypeLabel.getEngineType

val cacheEnable: Boolean = AMConfiguration.ENGINE_REUSE_CACHE_SUPPORT_ENGINES.getValue.contains(
engineTypeLabel.getEngineType
) && AMConfiguration.ENGINE_REUSE_ENABLE_CACHE.getValue

val instances = if (cacheEnable) {
var localInstances: util.Map[ScoreServiceInstance, util.List[Label[_]]] =
instanceCache.getIfPresent(cacheKey)
if (localInstances == null) {
localInstances = nodeLabelService.getScoredNodeMapsByLabels(filterLabelList)
instanceCache.put(cacheKey, localInstances)
}
localInstances
} else nodeLabelService.getScoredNodeMapsByLabels(filterLabelList)

if (null != instances && null != exclusionInstances && exclusionInstances.nonEmpty) {
val instancesKeys = instances.asScala.keys.toArray
Expand All @@ -159,8 +198,15 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
s"No engine can be reused, cause from db is null"
)
}
var engineScoreList =
getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)

var engineScoreList = if (cacheEnable) {
var localEngineList: Array[EngineNode] = engineNodesCache.getIfPresent(cacheKey)
if (localEngineList == null) {
localEngineList = getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)
engineNodesCache.put(cacheKey, localEngineList)
}
localEngineList
} else getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)

// reuse EC according to template name
val confTemplateNameKey = "ec.resource.name"
Expand Down
Loading