Skip to content

Commit 62653b9

Browse files
committed
Revert "[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0"
This reverts commit d1bd21a. ### What changes were proposed in this pull request? This pr aims to revert SPARK-45502 to make the test case `KafkaSourceStressSuite` stable. ### Why are the changes needed? The test case `KafkaSourceStressSuite` has become very unstable after the merger of SPARK-45502, with 10 out of the recent 22 tests failing because of it. Revert it for now, and we can upgrade Kafka again after resolving the test issues. - https://github.com/apache/spark/actions/runs/6497999347/job/17648385705 - https://github.com/apache/spark/actions/runs/6502219014/job/17660900989 - https://github.com/apache/spark/actions/runs/6502591917/job/17661861797 - https://github.com/apache/spark/actions/runs/6503144598/job/17663199041 - https://github.com/apache/spark/actions/runs/6503233514/job/17663413817 - https://github.com/apache/spark/actions/runs/6504416528/job/17666334238 - https://github.com/apache/spark/actions/runs/6509796846/job/17682130466 - https://github.com/apache/spark/actions/runs/6510877112/job/17685502094 - https://github.com/apache/spark/actions/runs/6512948316/job/17691625228 - https://github.com/apache/spark/actions/runs/6516366232/job/17699813649 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43379 from LuciferYang/Revert-SPARK-45502. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent 6994bad commit 62653b9

File tree

5 files changed

+25
-26
lines changed

5 files changed

+25
-26
lines changed

connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import scala.io.Source
2828
import scala.jdk.CollectionConverters._
2929

3030
import com.google.common.io.Files
31+
import kafka.api.Request
3132
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
3233
import kafka.server.checkpoints.OffsetCheckpointFile
3334
import kafka.zk.KafkaZkClient
@@ -39,7 +40,6 @@ import org.apache.kafka.clients.producer._
3940
import org.apache.kafka.common.TopicPartition
4041
import org.apache.kafka.common.config.SaslConfigs
4142
import org.apache.kafka.common.network.ListenerName
42-
import org.apache.kafka.common.requests.FetchRequest
4343
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
4444
import org.apache.kafka.common.serialization.StringSerializer
4545
import org.apache.kafka.common.utils.SystemTime
@@ -597,7 +597,7 @@ class KafkaTestUtils(
597597
.getPartitionInfo(topic, partition) match {
598598
case Some(partitionState) =>
599599
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
600-
FetchRequest.isValidBrokerId(partitionState.leader) &&
600+
Request.isValidBrokerId(partitionState.leader) &&
601601
!partitionState.replicas.isEmpty
602602

603603
case _ =>

connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,12 @@ import scala.concurrent.duration._
2424
import scala.jdk.CollectionConverters._
2525
import scala.util.Random
2626

27-
import kafka.log.{LogCleaner, UnifiedLog}
28-
import kafka.server.BrokerTopicStats
27+
import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog}
28+
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
2929
import kafka.utils.Pool
3030
import org.apache.kafka.common.TopicPartition
31-
import org.apache.kafka.common.config.TopicConfig
3231
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
3332
import org.apache.kafka.common.serialization.StringDeserializer
34-
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
3533
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
3634

3735
import org.apache.spark._
@@ -92,13 +90,13 @@ class KafkaRDDSuite extends SparkFunSuite {
9290
val dir = new File(logDir, topic + "-" + partition)
9391
dir.mkdirs()
9492
val logProps = new ju.Properties()
95-
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
96-
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))
93+
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
94+
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
9795
val logDirFailureChannel = new LogDirFailureChannel(1)
9896
val topicPartition = new TopicPartition(topic, partition)
9997
val producerIdExpirationMs = Int.MaxValue
100-
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false)
101-
val logConfig = new LogConfig(logProps)
98+
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs)
99+
val logConfig = LogConfig(logProps)
102100
val log = UnifiedLog(
103101
dir,
104102
logConfig,
@@ -122,7 +120,7 @@ class KafkaRDDSuite extends SparkFunSuite {
122120
log.roll()
123121
logs.put(topicPartition, log)
124122

125-
val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel)
123+
val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel)
126124
cleaner.startup()
127125
cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000)
128126

connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import scala.annotation.tailrec
2727
import scala.jdk.CollectionConverters._
2828
import scala.util.control.NonFatal
2929

30+
import kafka.api.Request
3031
import kafka.server.{KafkaConfig, KafkaServer}
3132
import kafka.zk.{AdminZkClient, KafkaZkClient}
3233
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
3334
import org.apache.kafka.common.TopicPartition
3435
import org.apache.kafka.common.network.ListenerName
35-
import org.apache.kafka.common.requests.FetchRequest
3636
import org.apache.kafka.common.serialization.StringSerializer
3737
import org.apache.kafka.common.utils.{Time => KTime}
3838
import org.apache.zookeeper.client.ZKClientConfig
@@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
304304
val leader = partitionState.leader
305305
val isr = partitionState.isr
306306
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
307-
FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
307+
Request.isValidBrokerId(leader) && !isr.isEmpty
308308
case _ =>
309309
false
310310
}

connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks
1919

2020
import java.util.concurrent.{ScheduledFuture, TimeUnit}
2121

22+
import kafka.utils.Scheduler
2223
import org.apache.kafka.common.utils.Time
23-
import org.apache.kafka.server.util.Scheduler
2424
import org.jmock.lib.concurrent.DeterministicScheduler
2525

2626
/**
@@ -42,6 +42,8 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
4242

4343
val scheduler = new DeterministicScheduler()
4444

45+
def isStarted: Boolean = true
46+
4547
def startup(): Unit = {}
4648

4749
def shutdown(): Unit = synchronized {
@@ -54,18 +56,17 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {
5456

5557
def schedule(
5658
name: String,
57-
task: Runnable,
58-
delayMs: Long = 0,
59-
periodMs: Long = -1): ScheduledFuture[_] = synchronized {
60-
if (periodMs >= 0) {
61-
scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS)
59+
fun: () => Unit,
60+
delay: Long = 0,
61+
period: Long = -1,
62+
unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
63+
val runnable = new Runnable {
64+
override def run(): Unit = fun()
65+
}
66+
if (period >= 0) {
67+
scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
6268
} else {
63-
scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
69+
scheduler.schedule(runnable, delay, unit)
6470
}
6571
}
66-
67-
override def resizeThreadPool(i: Int): Unit = {
68-
69-
}
70-
7172
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@
137137
<!-- Version used for internal directory structure -->
138138
<hive.version.short>2.3</hive.version.short>
139139
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
140-
<kafka.version>3.6.0</kafka.version>
140+
<kafka.version>3.4.1</kafka.version>
141141
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
142142
<derby.version>10.14.2.0</derby.version>
143143
<parquet.version>1.13.1</parquet.version>

0 commit comments

Comments
 (0)