Skip to content

Commit

Permalink
support white list
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Dec 10, 2022
1 parent 940b76b commit 55db79e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 6 deletions.
4 changes: 4 additions & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,14 @@ kyuubi.operation.status.polling.timeout|PT5S|Timeout(ms) for long polling asynch

Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.server.batch.limit.connections.per.ipaddress|<undefined>|Maximum kyuubi server batch connections per ipaddress. Any user exceeding this limit will not be allowed to connect.|int|1.7.0
kyuubi.server.batch.limit.connections.per.user|<undefined>|Maximum kyuubi server batch connections per user. Any user exceeding this limit will not be allowed to connect.|int|1.7.0
kyuubi.server.batch.limit.connections.per.user.ipaddress|<undefined>|Maximum kyuubi server batch connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect.|int|1.7.0
kyuubi.server.info.provider|ENGINE|The server information provider name, some clients may rely on this information to check the server compatibilities and functionalities. <li>SERVER: Return Kyuubi server information.</li> <li>ENGINE: Return Kyuubi engine information.</li>|string|1.6.1
kyuubi.server.limit.connections.per.ipaddress|&lt;undefined&gt;|Maximum kyuubi server connections per ipaddress. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user|&lt;undefined&gt;|Maximum kyuubi server connections per user. Any user exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.per.user.ipaddress|&lt;undefined&gt;|Maximum kyuubi server connections per user:ipaddress combination. Any user-ipaddress exceeding this limit will not be allowed to connect.|int|1.6.0
kyuubi.server.limit.connections.user.white.list||The maximin connections of the user in the white list will not be limited.|seq|1.7.0
kyuubi.server.name|&lt;undefined&gt;|The name of Kyuubi Server.|string|1.5.0
kyuubi.server.redaction.regex|&lt;undefined&gt;|Regex to decide which Kyuubi contain sensitive information. When this regex matches a property key or value, the value is redacted from the various logs.||1.6.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2132,6 +2132,15 @@ object KyuubiConf {
.intConf
.createOptional

val SERVER_LIMIT_CONNECTIONS_USER_WHITE_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.server.limit.connections.user.white.list")
.doc("The maximin connections of the user in the white list will not be limited.")
.version("1.7.0")
.serverOnly
.stringConf
.toSequence()
.createWithDefault(Nil)

val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.batch.limit.connections.per.user")
.doc("Maximum kyuubi server batch connections per user." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,26 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
val userLimit = conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER).getOrElse(0)
val ipAddressLimit = conf.get(SERVER_LIMIT_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
val userIpAddressLimit = conf.get(SERVER_LIMIT_CONNECTIONS_PER_USER_IPADDRESS).getOrElse(0)
limiter = applySessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit)
val userWhiteList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_WHITE_LIST)
limiter = applySessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, userWhiteList)

val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0)
val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
val userIpAddressBatchLimit =
conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER_IPADDRESS).getOrElse(0)
batchLimiter = applySessionLimiter(userBatchLimit, ipAddressBatchLimit, userIpAddressBatchLimit)
batchLimiter = applySessionLimiter(
userBatchLimit,
ipAddressBatchLimit,
userIpAddressBatchLimit,
userWhiteList)
}

private def applySessionLimiter(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int): Option[SessionLimiter] = {
userIpAddressLimit: Int,
userWhitelist: Seq[String]): Option[SessionLimiter] = {
Seq(userLimit, ipAddressLimit, userIpAddressLimit).find(_ > 0).map(_ =>
SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit))
SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, userWhitelist.toSet))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,37 @@ class SessionLimiterImpl(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit
}
}

class SessionLimiterWithUserWhitelistImpl(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
userWhitelist: Set[String])
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
override def increment(userIpAddress: UserIpAddress): Unit = {
if (!userWhitelist.contains(userIpAddress.user)) {
super.increment(userIpAddress)
}
}

override def decrement(userIpAddress: UserIpAddress): Unit = {
if (!userWhitelist.contains(userIpAddress.user)) {
super.decrement(userIpAddress)
}
}
}

object SessionLimiter {

def apply(userLimit: Int, ipAddressLimit: Int, userIpAddressLimit: Int): SessionLimiter = {
new SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit)
def apply(
userLimit: Int,
ipAddressLimit: Int,
userIpAddressLimit: Int,
userWhiteList: Set[String] = Set.empty): SessionLimiter = {
new SessionLimiterWithUserWhitelistImpl(
userLimit,
ipAddressLimit,
userIpAddressLimit,
userWhiteList)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,25 @@ class SessionLimiterSuite extends KyuubiFunSuite {
limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
.foreach(c => assert(c.get() == 0))
}

test("test session limiter with user white list") {
val user = "user001"
val ipAddress = "127.0.0.1"
val userLimit = 30
val ipAddressLimit = 20
val userIpAddressLimit = 10
val limiter = SessionLimiter(userLimit, ipAddressLimit, userIpAddressLimit, Set(user))
for (i <- 0 until 50) {
val userIpAddress = UserIpAddress(user, ipAddress)
limiter.increment(userIpAddress)
}
limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
.foreach(c => assert(c.get() == 0))
for (i <- 0 until 50) {
val userIpAddress = UserIpAddress(user, ipAddress)
limiter.decrement(userIpAddress)
}
limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
.foreach(c => assert(c.get() == 0))
}
}

0 comments on commit 55db79e

Please sign in to comment.