Skip to content

Commit 9911fab

Browse files
authored
KAFKA-15432: RLM Stop partitions should not be invoked for non-tiered storage topics (apache#14667)
Reviewers: Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
1 parent 7b0c076 commit 9911fab

File tree

5 files changed

+55
-10
lines changed

5 files changed

+55
-10
lines changed

core/src/main/java/kafka/log/remote/RemoteLogManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -365,9 +365,6 @@ public void stopPartitions(Set<StopPartition> stopPartitions,
365365
for (StopPartition stopPartition: stopPartitions) {
366366
TopicPartition tp = stopPartition.topicPartition();
367367
try {
368-
// We are assuming that if the topic exists in topicIdByPartitionMap then it has active archival
369-
// otherwise not. Ideally, `stopPartitions` should not be called for internal and non-tiered-storage
370-
// topics. See KAFKA-15432 for more details.
371368
if (topicIdByPartitionMap.containsKey(tp)) {
372369
TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
373370
RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
@@ -379,6 +376,8 @@ public void stopPartitions(Set<StopPartition> stopPartitions,
379376
LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
380377
deleteRemoteLogPartition(tpId);
381378
}
379+
} else {
380+
LOGGER.warn("StopPartition call is not expected for partition: {}", tp);
382381
}
383382
} catch (Exception ex) {
384383
errorHandler.accept(tp, ex);

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,13 +630,16 @@ class ReplicaManager(val config: KafkaConfig,
630630

631631
// Third delete the logs and checkpoint.
632632
val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
633+
val remotePartitionsToStop = partitionsToStop.filter {
634+
sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())
635+
}
633636
if (partitionsToDelete.nonEmpty) {
634637
// Delete the logs and checkpoint.
635638
logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => errorMap.put(tp, e))
636639
}
637640
remoteLogManager.foreach { rlm =>
638641
// exclude the partitions with offline/error state
639-
val partitions = partitionsToStop.filterNot(sp => errorMap.contains(sp.topicPartition)).toSet.asJava
642+
val partitions = remotePartitionsToStop.filterNot(sp => errorMap.contains(sp.topicPartition)).toSet.asJava
640643
if (!partitions.isEmpty) {
641644
rlm.stopPartitions(partitions, (tp, e) => errorMap.put(tp, e))
642645
}

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ import org.junit.jupiter.params.ParameterizedTest
6767
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
6868
import com.yammer.metrics.core.Gauge
6969
import kafka.log.remote.RemoteLogManager
70-
import org.apache.kafka.common.config.AbstractConfig
70+
import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
7171
import org.apache.kafka.raft.RaftConfig
7272
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
7373
import org.apache.kafka.server.util.timer.MockTimer
@@ -3118,7 +3118,8 @@ class ReplicaManagerTest {
31183118
isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
31193119
enableRemoteStorage: Boolean = false,
31203120
shouldMockLog: Boolean = false,
3121-
remoteLogManager: Option[RemoteLogManager] = None
3121+
remoteLogManager: Option[RemoteLogManager] = None,
3122+
defaultTopicRemoteLogStorageEnable: Boolean = true
31223123
): ReplicaManager = {
31233124
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
31243125
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
@@ -3127,8 +3128,11 @@ class ReplicaManagerTest {
31273128
propsModifier.apply(props)
31283129
val config = KafkaConfig.fromProps(props)
31293130
val logProps = new Properties()
3131+
if (enableRemoteStorage && defaultTopicRemoteLogStorageEnable) {
3132+
logProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
3133+
}
31303134
val mockLog = setupMockLog(path1)
3131-
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None)
3135+
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), new LogConfig(logProps), log = if (shouldMockLog) Some(mockLog) else None, remoteStorageSystemEnable = enableRemoteStorage)
31323136
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId))
31333137

31343138
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
@@ -5668,6 +5672,44 @@ class ReplicaManagerTest {
56685672

56695673
verify(spyRm).checkpointHighWatermarks()
56705674
}
5675+
5676+
@Test
5677+
def testNotCallStopPartitionsForNonTieredTopics(): Unit = {
5678+
val mockTimer = new MockTimer(time)
5679+
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, aliveBrokerIds = Seq(0, 1),
5680+
enableRemoteStorage = true, defaultTopicRemoteLogStorageEnable = false)
5681+
5682+
try {
5683+
val tp0 = new TopicPartition(topic, 0)
5684+
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
5685+
val partition = replicaManager.createPartition(tp0)
5686+
// The unified log created is not tiered because `defaultTopicRemoteLogStorageEnable` is set to false
5687+
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
5688+
5689+
val leaderAndIsr = LeaderAndIsr(0, 1, List(0, 1), LeaderRecoveryState.RECOVERED, LeaderAndIsr.InitialPartitionEpoch)
5690+
val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), leaderAndIsr)
5691+
5692+
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
5693+
verifyRLMOnLeadershipChange(Collections.singleton(partition), Collections.emptySet())
5694+
5695+
val requestLeaderEpoch = 1
5696+
val deleteLocalLog = true
5697+
val partitionStates = Map(tp0 -> new StopReplicaPartitionState()
5698+
.setPartitionIndex(tp0.partition)
5699+
.setLeaderEpoch(requestLeaderEpoch)
5700+
.setDeletePartition(deleteLocalLog)
5701+
)
5702+
5703+
val (result, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
5704+
5705+
assertEquals(Errors.NONE, error)
5706+
assertEquals(Map(tp0 -> Errors.NONE), result)
5707+
assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
5708+
verifyNoMoreInteractions(mockRemoteLogManager)
5709+
} finally {
5710+
replicaManager.shutdown(checkpointHW = false)
5711+
}
5712+
}
56715713
}
56725714

56735715
class MockReplicaSelector extends ReplicaSelector {

core/src/test/scala/unit/kafka/utils/TestUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,7 +1415,8 @@ object TestUtils extends Logging {
14151415
interBrokerProtocolVersion: MetadataVersion = MetadataVersion.latest,
14161416
recoveryThreadsPerDataDir: Int = 4,
14171417
transactionVerificationEnabled: Boolean = false,
1418-
log: Option[UnifiedLog] = None): LogManager = {
1418+
log: Option[UnifiedLog] = None,
1419+
remoteStorageSystemEnable: Boolean = false): LogManager = {
14191420
val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
14201421
initialOfflineDirs = Array.empty[File],
14211422
configRepository = configRepository,
@@ -1435,7 +1436,7 @@ object TestUtils extends Logging {
14351436
logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
14361437
keepPartitionMetadataFile = true,
14371438
interBrokerProtocolVersion = interBrokerProtocolVersion,
1438-
remoteStorageSystemEnable = false)
1439+
remoteStorageSystemEnable = remoteStorageSystemEnable)
14391440

14401441
if (log.isDefined) {
14411442
val spyLogManager = Mockito.spy(logManager)

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void setup() {
109109
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
110110
this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
111111
new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
112-
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty());
112+
1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latest(), 4, false, Option.empty(), false);
113113
scheduler.startup();
114114
final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty());
115115
final MetadataCache metadataCache =

0 commit comments

Comments
 (0)