Skip to content

Commit

Permalink
Linkis support to divide publicService into mutiple micro-services.
Browse files Browse the repository at this point in the history
closes apache#69
  • Loading branch information
wushengyeyouya committed Oct 9, 2019
1 parent 8825fb7 commit 238440c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util

import com.webank.wedatasphere.linkis.DataWorkCloudApplication
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.rpc.conf.RPCConfiguration
import com.webank.wedatasphere.linkis.rpc.sender.SpringMVCRPCSender

import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -69,6 +70,8 @@ object Sender {
private val serviceInstanceToSenders = new util.HashMap[ServiceInstance, Sender]
def getSender(applicationName: String): Sender = getSender(ServiceInstance(applicationName, null))
def getSender(serviceInstance: ServiceInstance): Sender = {
if(RPCConfiguration.ENABLE_PUBLIC_SERVICE.getValue && RPCConfiguration.PUBLIC_SERVICE_LIST.contains(serviceInstance.getApplicationName))
serviceInstance.setApplicationName(RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue)
if(!serviceInstanceToSenders.containsKey(serviceInstance)) serviceInstanceToSenders synchronized {
if(!serviceInstanceToSenders.containsKey(serviceInstance))
serviceInstanceToSenders.put(serviceInstance, new SpringMVCRPCSender(serviceInstance))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ object RPCConfiguration {
val BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX = CommonVars("wds.linkis.rpc.sender.asyn.consumer.freeTime.max", new TimeType("2m"))
val BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY = CommonVars("wds.linkis.rpc.sender.asyn.queue.size.max", 300)

val ENABLE_PUBLIC_SERVICE = CommonVars("wds.linkis.gateway.conf.enable.publicservice", true)
val PUBLIC_SERVICE_APPLICATION_NAME = CommonVars("wds.linkis.gateway.conf.publicservice.name", "publicservice")
val PUBLIC_SERVICE_LIST = CommonVars("wds.linkis.gateway.conf.publicservice.list", "query,jobhistory,application,configuration,filesystem,udf,variable").getValue.split(",")

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.webank.wedatasphere.linkis.DataWorkCloudApplication
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.gateway.http.{GatewayContext, GatewayRoute}
import com.webank.wedatasphere.linkis.rpc.conf.RPCConfiguration
import com.webank.wedatasphere.linkis.rpc.interceptor.ServiceInstanceUtils
import com.webank.wedatasphere.linkis.server.Message
import com.webank.wedatasphere.linkis.server.conf.ServerConfiguration
Expand Down Expand Up @@ -92,7 +93,9 @@ class DefaultGatewayParser(gatewayParsers: Array[GatewayParser]) extends Abstrac
responseHeartbeat(gatewayContext)
case COMMON_REGEX(version, serviceId) =>
if(sendResponseWhenNotMatchVersion(gatewayContext, version)) return
gatewayContext.getGatewayRoute.setServiceInstance(ServiceInstance(serviceId, null))
val applicationName = if(RPCConfiguration.ENABLE_PUBLIC_SERVICE.getValue && RPCConfiguration.PUBLIC_SERVICE_LIST.contains(serviceId))
RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue else serviceId
gatewayContext.getGatewayRoute.setServiceInstance(ServiceInstance(applicationName, null))
case p if p.startsWith("/dws/") =>
//TODO add version support
val params = gatewayContext.getGatewayRoute.getParams
Expand Down

0 comments on commit 238440c

Please sign in to comment.