Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.TopicConfig.INKLESS_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
Expand Down Expand Up @@ -700,6 +701,15 @@ ControllerResult<ApiError> updateFeatures(
boolean uncleanLeaderElectionEnabledForTopic(String topicName) {
String uncleanLeaderElection = getTopicConfig(topicName, UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG).value();
if (!uncleanLeaderElection.isEmpty()) {
final String inklessEnabled = getTopicConfig(topicName, INKLESS_ENABLE_CONFIG).value();
if (!inklessEnabled.isEmpty() && Boolean.parseBoolean(inklessEnabled)) {
// Inkless topics have unclean leader election enabled by default.
// Inkless topics always have a single partition and leadership is dynamically defined,
// while data consistency is guaranteed by the Inkless batch coordinator.
// In order to keep KRaft metadata consistent, allow the unclean leader election
// so stored leadership is refreshed on every controller restart.
return true;
}
return Boolean.parseBoolean(uncleanLeaderElection);
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class FakeKafkaConfigSchema {
define("min.insync.replicas", INT, "1", HIGH, ""));
CONFIGS.put(TOPIC, new ConfigDef().
define("unclean.leader.election.enable", BOOLEAN, "false", HIGH, "").
define("min.insync.replicas", INT, "1", HIGH, ""));
define("min.insync.replicas", INT, "1", HIGH, "").
define("inkless.enable", BOOLEAN, "false", HIGH, ""));
}

public static final Map<String, List<ConfigSynonym>> SYNONYMS = new HashMap<>();
Expand All @@ -53,6 +54,8 @@ public class FakeKafkaConfigSchema {
List.of(new ConfigSynonym("unclean.leader.election.enable")));
SYNONYMS.put("min.insync.replicas",
List.of(new ConfigSynonym("min.insync.replicas")));
SYNONYMS.put("inkless.enable",
List.of(new ConfigSynonym("inkless.enable")));
}

public static final KafkaConfigSchema INSTANCE = new KafkaConfigSchema(CONFIGS, SYNONYMS);
Expand Down