Skip to content

Commit a94508f

Browse files
committed
Catch a few more cases
1 parent 205ca98 commit a94508f

File tree

2 files changed

+13
-18
lines changed

2 files changed

+13
-18
lines changed

external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC
2727
import org.mockito.ArgumentMatchers._
2828
import org.mockito.Mockito._
2929
import org.mockito.invocation.InvocationOnMock
30-
import org.mockito.stubbing.Answer
3130
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
3231
import org.scalatest.concurrent.Eventually
3332
import org.scalatest.mockito.MockitoSugar
@@ -124,11 +123,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase
124123
test("if checkpointing is going on, wait until finished before removing and checkpointing") {
125124
when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
126125
.thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
127-
when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
128-
override def answer(invocations: InvocationOnMock): Unit = {
129-
clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
130-
}
131-
})
126+
when(checkpointerMock.checkpoint(anyString)).thenAnswer((_: InvocationOnMock) =>
127+
clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
128+
)
132129

133130
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
134131
clock.advance(checkpointInterval.milliseconds)

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
2929
import org.apache.commons.logging.LogFactory
3030
import org.apache.hadoop.conf.Configuration
3131
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
32-
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
32+
import org.apache.hadoop.hive.common.HiveInterruptUtils
3333
import org.apache.hadoop.hive.conf.HiveConf
3434
import org.apache.hadoop.hive.ql.Driver
3535
import org.apache.hadoop.hive.ql.exec.Utilities
@@ -65,16 +65,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
6565
* a command is being processed by the current thread.
6666
*/
6767
def installSignalHandler() {
68-
HiveInterruptUtils.add(new HiveInterruptCallback {
69-
override def interrupt() {
70-
// Handle remote execution mode
71-
if (SparkSQLEnv.sparkContext != null) {
72-
SparkSQLEnv.sparkContext.cancelAllJobs()
73-
} else {
74-
if (transport != null) {
75-
// Force closing of TCP connection upon session termination
76-
transport.getSocket.close()
77-
}
68+
HiveInterruptUtils.add(() => {
69+
// Handle remote execution mode
70+
if (SparkSQLEnv.sparkContext != null) {
71+
SparkSQLEnv.sparkContext.cancelAllJobs()
72+
} else {
73+
if (transport != null) {
74+
// Force closing of TCP connection upon session termination
75+
transport.getSocket.close()
7876
}
7977
}
8078
})
@@ -208,7 +206,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
208206
reader.setBellEnabled(false)
209207
reader.setExpandEvents(false)
210208
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
211-
CliDriver.getCommandCompleter.foreach((e) => reader.addCompleter(e))
209+
CliDriver.getCommandCompleter.foreach(reader.addCompleter)
212210

213211
val historyDirectory = System.getProperty("user.home")
214212

0 commit comments

Comments
 (0)