Skip to content

Commit 4fe0046

Browse files
committed
withClosingOperationLog
1 parent 4c029f9 commit 4fe0046

File tree

2 files changed

+18
-8
lines changed

2 files changed

+18
-8
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/AbstractOperation.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.operation
1919

20+
import java.io.IOException
2021
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
2122
import java.util.concurrent.locks.ReentrantLock
2223

@@ -247,4 +248,19 @@ abstract class AbstractOperation(session: Session) extends Operation with Loggin
247248
ok.setInfoMessages(hints.asJava)
248249
ok
249250
}
251+
252+
/**
253+
* Close the OperationLog, after running the block
254+
*/
255+
def withClosingOperationLog[T](f: => T): T = {
256+
try {
257+
f
258+
} finally {
259+
try {
260+
getOperationLog.foreach(_.close())
261+
} catch {
262+
case e: IOException => error(e.getMessage, e)
263+
}
264+
}
265+
}
250266
}

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class BatchJobSubmission(
336336
}
337337
}
338338

339-
override def close(): Unit = withLockRequired {
339+
override def close(): Unit = withLockRequired(withClosingOperationLog {
340340
if (!isClosedOrCanceled) {
341341
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opType)))
342342

@@ -373,13 +373,7 @@ class BatchJobSubmission(
373373
}
374374
}
375375
}
376-
377-
try {
378-
getOperationLog.foreach(_.close())
379-
} catch {
380-
case e: IOException => error(e.getMessage, e)
381-
}
382-
}
376+
})
383377

384378
override def cancel(): Unit = {
385379
throw new IllegalStateException("Use close instead.")

0 commit comments

Comments
 (0)