Skip to content

Commit 36d05d2

Browse files
committed
Review fixes:
* Removed kafka.version from external modules * Some minor beautifying
1 parent 30df8f1 commit 36d05d2

File tree

4 files changed

+21
-24
lines changed

4 files changed

+21
-24
lines changed

external/kafka-0-10-sql/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
3030
<properties>
3131
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
32-
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
33-
<kafka.version>2.1.0</kafka.version>
3432
</properties>
3533
<packaging>jar</packaging>
3634
<name>Kafka 0.10+ Source for Structured Streaming</name>

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,15 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
4242
sparkConf = new SparkConf()
4343
}
4444

45-
private def addTokenToUGI: Unit = {
45+
override def afterEach(): Unit = {
46+
try {
47+
resetUGI
48+
} finally {
49+
super.afterEach()
50+
}
51+
}
52+
53+
private def addTokenToUGI(): Unit = {
4654
val token = new Token[KafkaDelegationTokenIdentifier](
4755
tokenId.getBytes,
4856
tokenPassword.getBytes,
@@ -64,31 +72,23 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
6472
}
6573

6674
test("getTokenJaasParams with token no service should throw exception") {
67-
try {
68-
addTokenToUGI
69-
70-
val thrown = intercept[IllegalArgumentException] {
71-
KafkaSecurityHelper.getTokenJaasParams(sparkConf)
72-
}
75+
addTokenToUGI
7376

74-
assert(thrown.getMessage contains "Kerberos service name must be defined")
75-
} finally {
76-
resetUGI
77+
val thrown = intercept[IllegalArgumentException] {
78+
KafkaSecurityHelper.getTokenJaasParams(sparkConf)
7779
}
80+
81+
assert(thrown.getMessage contains "Kerberos service name must be defined")
7882
}
7983

8084
test("getTokenJaasParams with token should return scram module") {
81-
try {
82-
addTokenToUGI
83-
sparkConf.set(KAFKA_KERBEROS_SERVICE_NAME, kerberosServiceName)
85+
addTokenToUGI
86+
sparkConf.set(KAFKA_KERBEROS_SERVICE_NAME, kerberosServiceName)
8487

85-
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
88+
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
8689

87-
assert(jaasParams.get.contains("ScramLoginModule"))
88-
assert(jaasParams.get.contains(tokenId))
89-
assert(jaasParams.get.contains(tokenPassword))
90-
} finally {
91-
resetUGI
92-
}
90+
assert(jaasParams.get.contains("ScramLoginModule"))
91+
assert(jaasParams.get.contains(tokenId))
92+
assert(jaasParams.get.contains(tokenPassword))
9393
}
9494
}

external/kafka-0-10/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
2929
<properties>
3030
<sbt.project.name>streaming-kafka-0-10</sbt.project.name>
31-
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
32-
<kafka.version>2.1.0</kafka.version>
3331
</properties>
3432
<packaging>jar</packaging>
3533
<name>Spark Integration for Kafka 0.10</name>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128
<hive.version>1.2.1.spark2</hive.version>
129129
<!-- Version used for internal directory structure -->
130130
<hive.version.short>1.2.1</hive.version.short>
131+
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
131132
<kafka.version>2.1.0</kafka.version>
132133
<derby.version>10.12.1.1</derby.version>
133134
<parquet.version>1.10.0</parquet.version>

0 commit comments

Comments
 (0)