17
17
18
18
package org .apache .kyuubi .server .metadata
19
19
20
+ import com .google .common .annotations .VisibleForTesting
21
+
20
22
import java .util .concurrent .{ConcurrentHashMap , ThreadPoolExecutor , TimeUnit }
21
23
import java .util .concurrent .atomic .AtomicInteger
22
-
23
24
import scala .collection .JavaConverters ._
24
-
25
25
import org .apache .kyuubi .{KyuubiException , Logging }
26
26
import org .apache .kyuubi .client .api .v1 .dto .Batch
27
27
import org .apache .kyuubi .config .KyuubiConf
@@ -240,27 +240,7 @@ class MetadataManager extends AbstractService("MetadataManager") {
240
240
val batchInterval = conf.get(KyuubiConf .METADATA_CLEANER_BATCH_INTERVAL )
241
241
val cleanerTask : Runnable = () => {
242
242
try {
243
- var needToCleanMetadata = true
244
- var needToCleanKubernetesInfo = true
245
-
246
- while (needToCleanMetadata || needToCleanKubernetesInfo) {
247
- if (needToCleanMetadata) {
248
- needToCleanMetadata =
249
- withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(
250
- stateMaxAge,
251
- batchSize)) >= batchSize
252
- }
253
- if (needToCleanKubernetesInfo) {
254
- needToCleanKubernetesInfo =
255
- withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(
256
- stateMaxAge,
257
- batchSize)) >= batchSize
258
- }
259
- if (needToCleanMetadata || needToCleanKubernetesInfo) {
260
- info(" Sleep for " + batchInterval + " ms before next metadata cleanup batch" )
261
- Thread .sleep(batchInterval)
262
- }
263
- }
243
+ cleanupMetadata(stateMaxAge, batchSize, batchInterval)
264
244
} catch {
265
245
case e : Throwable => error(" Error cleaning up the metadata by age" , e)
266
246
}
@@ -274,6 +254,31 @@ class MetadataManager extends AbstractService("MetadataManager") {
274
254
TimeUnit .MILLISECONDS )
275
255
}
276
256
257
+ @ VisibleForTesting
258
+ private [metadata] def cleanupMetadata (maxAge : Long , batchSize : Int , batchInterval : Long ): Unit = {
259
+ var needToCleanMetadata = true
260
+ var needToCleanKubernetesInfo = true
261
+
262
+ while (needToCleanMetadata || needToCleanKubernetesInfo) {
263
+ if (needToCleanMetadata) {
264
+ needToCleanMetadata =
265
+ withMetadataRequestMetrics(_metadataStore.cleanupMetadataByAge(
266
+ maxAge,
267
+ batchSize)) >= batchSize
268
+ }
269
+ if (needToCleanKubernetesInfo) {
270
+ needToCleanKubernetesInfo =
271
+ withMetadataRequestMetrics(_metadataStore.cleanupKubernetesEngineInfoByAge(
272
+ maxAge,
273
+ batchSize)) >= batchSize
274
+ }
275
+ if (needToCleanMetadata || needToCleanKubernetesInfo) {
276
+ info(" Sleep for " + batchInterval + " ms before next metadata cleanup batch" )
277
+ Thread .sleep(batchInterval)
278
+ }
279
+ }
280
+ }
281
+
277
282
def addMetadataRetryRequest (request : MetadataRequest ): Unit = {
278
283
val maxRequestsAsyncRetryRefs : Int =
279
284
conf.get(KyuubiConf .METADATA_REQUEST_ASYNC_RETRY_QUEUE_SIZE )
0 commit comments