Skip to content

Commit b24d94e

Browse files
zwangshengpan3793
andcommitted
[KYUUBI #5328] Batch supports priority scheduling
### _Why are the changes needed?_ Follow #5329 and close #5328: 1. Add new config `kyuubi.metadata.store.jdbc.priority.enabled` to control whether enable priority scheduling, due to users may experience performance issues when using MySQL5.7 as metastore backend and enabling kyuubi batch v2 priority feature. 2. When priority scheduling is enabled, `KyuubiBatchService` picks metadata job with `ORDER BY priority DESC, create_time ASC`. 3. Insert metadata with priority field, default priority value is `10`. 4. Add new config `kyuubi.batch.priority` for each batch priority. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5352 from zwangsheng/KYUUBI#5328. Closes #5328 687ed1e [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala 58621b5 [zwangsheng] fix comments 1bf81e7 [zwangsheng] fix style 7ed2551 [zwangsheng] update default priority desc & improve UT 21ceccb [zwangsheng] fix doc 27fc5e8 [zwangsheng] enrich desc c0bbc0d [zwangsheng] fix style 6b8d0f0 [zwangsheng] fix comment 67eb252 [zwangsheng] fix comment e1705c3 [zwangsheng] Add config to control whether pick order by priority or not 129a467 [zwangsheng] Add unit test for pickBatchForSubmitting fcaf85d [zwangsheng] Fix unit test f7ca221 [zwangsheng] Fix unit test 8d4b276 [wangsheng] fix code style 4c6b990 [wangsheng] fix comments 654ad84 [zwangsheng] [KYUUBI #5328][V2] Kyuubi Server Pick Metadata job with priority Lead-authored-by: zwangsheng <binjieyang@apache.org> Co-authored-by: wangsheng <2213335496@qq.com> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 5940fd1 commit b24d94e

File tree

8 files changed

+96
-11
lines changed

8 files changed

+96
-11
lines changed

docs/configuration/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
347347
| kyuubi.metadata.store.jdbc.database.type | SQLITE | The database type for server jdbc metadata store.<ul> <li>(Deprecated) DERBY: Apache Derby, JDBC driver `org.apache.derby.jdbc.AutoloadedDriver`.</li> <li>SQLITE: SQLite3, JDBC driver `org.sqlite.JDBC`.</li> <li>MYSQL: MySQL, JDBC driver `com.mysql.jdbc.Driver`.</li> <li>CUSTOM: User-defined database type, need to specify corresponding JDBC driver.</li> Note that: The JDBC datasource is powered by HiKariCP, for datasource properties, please specify them with the prefix: kyuubi.metadata.store.jdbc.datasource. For example, kyuubi.metadata.store.jdbc.datasource.connectionTimeout=10000. | string | 1.6.0 |
348348
| kyuubi.metadata.store.jdbc.driver | &lt;undefined&gt; | JDBC driver class name for server jdbc metadata store. | string | 1.6.0 |
349349
| kyuubi.metadata.store.jdbc.password || The password for server JDBC metadata store. | string | 1.6.0 |
350+
| kyuubi.metadata.store.jdbc.priority.enabled | false | Whether to enable the priority scheduling for batch impl v2. When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy for batch job scheduling. Note: this feature may cause significant performance issues when using MySQL 5.7 as the metastore backend due to the lack of support for mixed order index. See more details at KYUUBI #5329. | boolean | 1.8.0 |
350351
| kyuubi.metadata.store.jdbc.url | jdbc:sqlite:kyuubi_state_store.db | The JDBC url for server JDBC metadata store. By default, it is a SQLite database url, and the state information is not shared across kyuubi instances. To enable high availability for multiple kyuubi instances, please specify a production JDBC url. | string | 1.6.0 |
351352
| kyuubi.metadata.store.jdbc.user || The username for server JDBC metadata store. | string | 1.6.0 |
352353

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ object KyuubiReservedKeys {
2626
final val KYUUBI_SESSION_USER_SIGN = "kyuubi.session.user.sign"
2727
final val KYUUBI_SESSION_REAL_USER_KEY = "kyuubi.session.real.user"
2828
final val KYUUBI_SESSION_CONNECTION_URL_KEY = "kyuubi.session.connection.url"
29+
// default priority is 10, higher priority will be scheduled first
30+
// when enabled metadata store priority feature
31+
final val KYUUBI_BATCH_PRIORITY = "kyuubi.batch.priority"
2932
final val KYUUBI_BATCH_RESOURCE_UPLOADED_KEY = "kyuubi.batch.resource.uploaded"
3033
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
3134
final val KYUUBI_ENGINE_ID = "kyuubi.engine.id"

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/api/Metadata.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ case class Metadata(
7878
engineState: String = null,
7979
engineError: Option[String] = None,
8080
endTime: Long = 0L,
81+
// keep consistent with table creation DDL
82+
// find why we set 10 as default in KYUUBI #5329
83+
priority: Int = 10,
8184
peerInstanceClosed: Boolean = false) {
8285
def appMgrInfo: ApplicationManagerInfo = {
8386
ApplicationManagerInfo(

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStore.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
6161
case CUSTOM => new GenericDatabaseDialect
6262
}
6363

64+
private val priorityEnabled = conf.get(METADATA_STORE_JDBC_PRIORITY_ENABLED)
65+
6466
private val datasourceProperties =
6567
JDBCMetadataStoreConf.getMetadataStoreJDBCDataSourceProperties(conf)
6668
private val hikariConfig = new HikariConfig(datasourceProperties)
@@ -167,9 +169,10 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
167169
|request_args,
168170
|create_time,
169171
|engine_type,
170-
|cluster_manager
172+
|cluster_manager,
173+
|priority
171174
|)
172-
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
175+
|VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
173176
|""".stripMargin
174177

175178
JdbcUtils.withConnection { connection =>
@@ -190,15 +193,16 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
190193
valueAsString(metadata.requestArgs),
191194
metadata.createTime,
192195
Option(metadata.engineType).map(_.toUpperCase(Locale.ROOT)).orNull,
193-
metadata.clusterManager.orNull)
196+
metadata.clusterManager.orNull,
197+
metadata.priority)
194198
}
195199
}
196200

197201
override def pickMetadata(kyuubiInstance: String): Option[Metadata] = synchronized {
198202
JdbcUtils.executeQueryWithRowMapper(
199203
s"""SELECT identifier FROM $METADATA_TABLE
200204
|WHERE state=?
201-
|ORDER BY create_time ASC LIMIT 1
205+
|ORDER BY ${if (priorityEnabled) "priority DESC, " else ""}create_time ASC LIMIT 1
202206
|""".stripMargin) { stmt =>
203207
stmt.setString(1, OperationState.INITIALIZED.toString)
204208
} { resultSet =>

kyuubi-server/src/main/scala/org/apache/kyuubi/server/metadata/jdbc/JDBCMetadataStoreConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,16 @@ object JDBCMetadataStoreConf {
9393
.serverOnly
9494
.stringConf
9595
.createWithDefault("")
96+
97+
val METADATA_STORE_JDBC_PRIORITY_ENABLED: ConfigEntry[Boolean] =
98+
buildConf("kyuubi.metadata.store.jdbc.priority.enabled")
99+
.doc("Whether to enable the priority scheduling for batch impl v2. " +
100+
"When false, ignore kyuubi.batch.priority and use the FIFO ordering strategy " +
101+
"for batch job scheduling. Note: this feature may cause significant performance issues " +
102+
"when using MySQL 5.7 as the metastore backend due to the lack of support " +
103+
"for mixed order index. See more details at KYUUBI #5329.")
104+
.version("1.8.0")
105+
.serverOnly
106+
.booleanConf
107+
.createWithDefault(false)
96108
}

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSession.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
2323

2424
import org.apache.kyuubi.client.util.BatchUtils._
2525
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
26+
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_BATCH_PRIORITY
2627
import org.apache.kyuubi.engine.KyuubiApplicationManager
2728
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
2829
import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent}
@@ -181,7 +182,8 @@ class KyuubiBatchSession(
181182
requestArgs = batchArgs,
182183
createTime = createTime,
183184
engineType = batchType,
184-
clusterManager = batchJobSubmissionOp.builder.clusterManager())
185+
clusterManager = batchJobSubmissionOp.builder.clusterManager(),
186+
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
185187

186188
// there is a chance that operation failed w/ duplicated key error
187189
sessionManager.insertMetadata(newMetadata)

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.kyuubi.client.api.v1.dto.{Batch, BatchRequest}
3030
import org.apache.kyuubi.client.util.BatchUtils.KYUUBI_BATCH_ID_KEY
3131
import org.apache.kyuubi.config.KyuubiConf
3232
import org.apache.kyuubi.config.KyuubiConf._
33-
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_REAL_USER_KEY
33+
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_BATCH_PRIORITY, KYUUBI_SESSION_REAL_USER_KEY}
3434
import org.apache.kyuubi.credentials.HadoopCredentialsManager
3535
import org.apache.kyuubi.engine.KyuubiApplicationManager
3636
import org.apache.kyuubi.metrics.MetricsConstants._
@@ -237,7 +237,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
237237
requestConf = conf,
238238
requestArgs = batchRequest.getArgs.asScala.toSeq,
239239
createTime = System.currentTimeMillis(),
240-
engineType = batchRequest.getBatchType)
240+
engineType = batchRequest.getBatchType,
241+
priority = conf.get(KYUUBI_BATCH_PRIORITY).map(_.toInt).getOrElse(10))
241242

242243
// there is a chance that operation failed w/ duplicated key error
243244
metadataManager.foreach(_.insertMetadata(metadata, asyncRetryOnError = false))

kyuubi-server/src/test/scala/org/apache/kyuubi/server/metadata/MetadataManagerSuite.scala

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ import org.apache.kyuubi.config.KyuubiConf
2828
import org.apache.kyuubi.config.KyuubiConf._
2929
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
3030
import org.apache.kyuubi.metrics.MetricsConstants._
31+
import org.apache.kyuubi.operation.OperationState
3132
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
33+
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf.METADATA_STORE_JDBC_PRIORITY_ENABLED
3234
import org.apache.kyuubi.session.SessionType
3335

3436
class MetadataManagerSuite extends KyuubiFunSuite {
@@ -142,6 +144,58 @@ class MetadataManagerSuite extends KyuubiFunSuite {
142144
}
143145
}
144146

147+
test("[KYUUBI #5328] Test MetadataManager#pickBatchForSubmitting in order") {
148+
// build mock batch jobs
149+
val mockKyuubiInstance = "mock_kyuubi_instance"
150+
val time = System.currentTimeMillis()
151+
val mockBatchJob1 = newMetadata(
152+
identifier = "mock_batch_job_1",
153+
state = OperationState.INITIALIZED.toString,
154+
createTime = time + 10000,
155+
// larger than default priority 10
156+
priority = 20)
157+
val mockBatchJob2 = newMetadata(
158+
identifier = "mock_batch_job_2",
159+
state = OperationState.INITIALIZED.toString,
160+
createTime = time)
161+
val mockBatchJob3 = newMetadata(
162+
identifier = "mock_batch_job_3",
163+
state = OperationState.INITIALIZED.toString,
164+
createTime = time + 5000)
165+
166+
withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "true")) {
167+
metadataManager =>
168+
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
169+
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
170+
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)
171+
172+
// pick the highest priority batch job
173+
val metadata1 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
174+
assert(metadata1.exists(m => m.identifier === "mock_batch_job_1"))
175+
176+
// pick the oldest batch job when same priority
177+
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
178+
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
179+
180+
val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
181+
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
182+
}
183+
184+
withMetadataManager(Map(METADATA_STORE_JDBC_PRIORITY_ENABLED.key -> "false")) {
185+
metadataManager =>
186+
metadataManager.insertMetadata(mockBatchJob1, asyncRetryOnError = false)
187+
metadataManager.insertMetadata(mockBatchJob2, asyncRetryOnError = false)
188+
metadataManager.insertMetadata(mockBatchJob3, asyncRetryOnError = false)
189+
190+
// pick the oldest batch job
191+
val metadata2 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
192+
assert(metadata2.exists(m => m.identifier === "mock_batch_job_2"))
193+
194+
val metadata3 = metadataManager.pickBatchForSubmitting(mockKyuubiInstance)
195+
assert(metadata3.exists(m => m.identifier === "mock_batch_job_3"))
196+
}
197+
}
198+
145199
private def withMetadataManager(
146200
confOverlay: Map[String, String],
147201
newMetadataMgr: () => MetadataManager = () => new MetadataManager())(
@@ -169,22 +223,27 @@ class MetadataManagerSuite extends KyuubiFunSuite {
169223
}
170224
}
171225

172-
private def newMetadata(): Metadata = {
226+
private def newMetadata(
227+
identifier: String = UUID.randomUUID().toString,
228+
state: String = OperationState.PENDING.toString,
229+
createTime: Long = System.currentTimeMillis(),
230+
priority: Int = 10): Metadata = {
173231
Metadata(
174-
identifier = UUID.randomUUID().toString,
232+
identifier = identifier,
175233
sessionType = SessionType.BATCH,
176234
realUser = "kyuubi",
177235
username = "kyuubi",
178236
ipAddress = "127.0.0.1",
179237
kyuubiInstance = "localhost:10009",
180-
state = "PENDING",
238+
state = state,
181239
resource = "intern",
182240
className = "org.apache.kyuubi.SparkWC",
183241
requestName = "kyuubi_batch",
184242
requestConf = Map("spark.master" -> "local"),
185243
requestArgs = Seq("100"),
186-
createTime = System.currentTimeMillis(),
244+
createTime = createTime,
187245
engineType = "spark",
246+
priority = priority,
188247
clusterManager = Some("local"))
189248
}
190249
}

0 commit comments

Comments
 (0)