Skip to content

Commit a8a8dc7

Browse files
author
Boris S
committed
SAMZA-1888: Kafka consumer improvements
Author: Boris S <bshkolnik@linkedin.com> Author: Boris S <boryas@apache.org> Author: Boris Shkolnik <bshkolni@linkedin.com> Reviewers: bharathkk <codin.martial@gmail.com> Closes apache#738 from sborya/KafkaConsumerImprovements
1 parent 6d20ee7 commit a8a8dc7

File tree

5 files changed

+60
-339
lines changed

5 files changed

+60
-339
lines changed

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
8080

8181
protected final String systemName;
8282
protected final Consumer metadataConsumer;
83+
protected final Config config;
8384

84-
// get ZkUtils object to connect to Kafka's ZK.
85-
private final Supplier<ZkUtils> getZkConnection;
85+
protected AdminClient adminClient = null;
8686

8787
// Custom properties to create a new coordinator stream.
8888
private final Properties coordinatorStreamProperties;
@@ -96,52 +96,21 @@ public class KafkaSystemAdmin implements ExtendedSystemAdmin {
9696
// Kafka properties for intermediate topics creation
9797
private final Map<String, Properties> intermediateStreamProperties;
9898

99-
// adminClient is required for deleteCommittedMessages operation
100-
private final AdminClient adminClient;
101-
10299
// used for intermediate streams
103-
private final boolean deleteCommittedMessages;
100+
protected final boolean deleteCommittedMessages;
104101

105102
private final AtomicBoolean stopped = new AtomicBoolean(false);
106103

107104
public KafkaSystemAdmin(String systemName, Config config, Consumer metadataConsumer) {
108105
this.systemName = systemName;
106+
this.config = config;
109107

110108
if (metadataConsumer == null) {
111109
throw new SamzaException(
112110
"Cannot construct KafkaSystemAdmin for system " + systemName + " with null metadataConsumer");
113111
}
114112
this.metadataConsumer = metadataConsumer;
115113

116-
// populate brokerList from either consumer or producer configs
117-
Properties props = new Properties();
118-
String brokerList = config.get(
119-
String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
120-
if (brokerList == null) {
121-
brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
122-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
123-
}
124-
if (brokerList == null) {
125-
throw new SamzaException(
126-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
127-
}
128-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
129-
130-
// kafka.admin.AdminUtils requires zkConnect
131-
// this will change after we move to the new org.apache..AdminClient
132-
String zkConnect =
133-
config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
134-
if (StringUtils.isBlank(zkConnect)) {
135-
throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
136-
}
137-
props.put(ZOOKEEPER_CONNECT, zkConnect);
138-
139-
adminClient = AdminClient.create(props);
140-
141-
getZkConnection = () -> {
142-
return ZkUtils.apply(zkConnect, 6000, 6000, false);
143-
};
144-
145114
KafkaConfig kafkaConfig = new KafkaConfig(config);
146115
coordinatorStreamReplicationFactor = Integer.valueOf(kafkaConfig.getCoordinatorReplicationFactor());
147116
coordinatorStreamProperties = KafkaSystemAdminUtilsScala.getCoordinatorTopicProperties(kafkaConfig);
@@ -197,6 +166,8 @@ public void stop() {
197166
} catch (Exception e) {
198167
LOG.warn("metadataConsumer.close for system " + systemName + " failed with exception.", e);
199168
}
169+
}
170+
if (adminClient != null) {
200171
try {
201172
adminClient.close();
202173
} catch (Exception e) {
@@ -546,14 +517,14 @@ public Integer offsetComparator(String offset1, String offset2) {
546517
public boolean createStream(StreamSpec streamSpec) {
547518
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
548519

549-
return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection);
520+
return KafkaSystemAdminUtilsScala.createStream(toKafkaSpec(streamSpec), getZkConnection());
550521
}
551522

552523
@Override
553524
public boolean clearStream(StreamSpec streamSpec) {
554525
LOG.info("Creating Kafka topic: {} on system: {}", streamSpec.getPhysicalName(), streamSpec.getSystemName());
555526

556-
KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection);
527+
KafkaSystemAdminUtilsScala.clearStream(streamSpec, getZkConnection());
557528

558529
Map<String, List<PartitionInfo>> topicsMetadata = getTopicMetadata(ImmutableSet.of(streamSpec.getPhysicalName()));
559530
return topicsMetadata.get(streamSpec.getPhysicalName()).isEmpty();
@@ -630,11 +601,56 @@ Map<String, List<PartitionInfo>> getTopicMetadata(Set<String> topics) {
630601
@Override
631602
public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
632603
if (deleteCommittedMessages) {
604+
if (adminClient == null) {
605+
adminClient = AdminClient.create(createAdminClientProperties());
606+
}
633607
KafkaSystemAdminUtilsScala.deleteMessages(adminClient, offsets);
634608
deleteMessageCalled = true;
635609
}
636610
}
637611

612+
protected Properties createAdminClientProperties() {
613+
// populate brokerList from either consumer or producer configs
614+
Properties props = new Properties();
615+
// included SSL settings if needed
616+
617+
props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));
618+
619+
//validate brokerList
620+
String brokerList = config.get(
621+
String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
622+
if (brokerList == null) {
623+
brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
624+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
625+
}
626+
if (brokerList == null) {
627+
throw new SamzaException(
628+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
629+
}
630+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
631+
632+
633+
// kafka.admin.AdminUtils requires zkConnect
634+
// this will change after we move to the new org.apache..AdminClient
635+
String zkConnect =
636+
config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
637+
if (StringUtils.isBlank(zkConnect)) {
638+
throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
639+
}
640+
props.put(ZOOKEEPER_CONNECT, zkConnect);
641+
642+
return props;
643+
}
644+
645+
private Supplier<ZkUtils> getZkConnection() {
646+
String zkConnect =
647+
config.get(String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ZOOKEEPER_CONNECT));
648+
if (StringUtils.isBlank(zkConnect)) {
649+
throw new SamzaException("Missing zookeeper.connect config for admin for system " + systemName);
650+
}
651+
return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
652+
}
653+
638654
/**
639655
* Container for metadata about offsets.
640656
*/

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,10 @@ public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements Sy
7777

7878
/**
7979
* Create a KafkaSystemConsumer for the provided {@code systemName}
80+
* @param kafkaConsumer kafka Consumer object to be used by this system consumer
8081
* @param systemName system name for which we create the consumer
8182
* @param config application config
83+
* @param clientId clientId from the kafka consumer to be used in the KafkaConsumerProxy
8284
* @param metrics metrics for this KafkaSystemConsumer
8385
* @param clock system clock
8486
*/
@@ -106,12 +108,13 @@ public KafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Conf
106108

107109
/**
108110
* Create internal kafka consumer object, which will be used in the Proxy.
111+
* @param <K> key type for the consumer
112+
* @param <V> value type for the consumer
109113
* @param systemName system name for which we create the consumer
110114
* @param kafkaConsumerConfig config object for Kafka's KafkaConsumer
111-
* @return KafkaConsumer object
115+
* @return KafkaConsumer newly created kafka consumer object
112116
*/
113-
public static <K,V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName,
114-
HashMap<String, Object> kafkaConsumerConfig) {
117+
public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String systemName, HashMap<String, Object> kafkaConsumerConfig) {
115118

116119
LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", systemName, kafkaConsumerConfig);
117120
return new KafkaConsumer<>(kafkaConsumerConfig);
@@ -176,7 +179,7 @@ void startConsumer() {
176179
throw new SamzaException(msg, e);
177180
}
178181

179-
LOG.info("{}: Changing consumer's starting offset for tp = %s to %s", this, tp, startingOffsetString);
182+
LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString);
180183

181184
// add the partition to the proxy
182185
proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
@@ -310,16 +313,10 @@ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
310313
return super.poll(systemStreamPartitions, timeout);
311314
}
312315

313-
/**
314-
* convert from TopicPartition to TopicAndPartition
315-
*/
316316
public static TopicAndPartition toTopicAndPartition(TopicPartition tp) {
317317
return new TopicAndPartition(tp.topic(), tp.partition());
318318
}
319319

320-
/**
321-
* convert to TopicPartition from SystemStreamPartition
322-
*/
323320
public static TopicPartition toTopicPartition(SystemStreamPartition ssp) {
324321
return new TopicPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
325322
}

samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala

Lines changed: 0 additions & 66 deletions
This file was deleted.

samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala

Lines changed: 0 additions & 116 deletions
This file was deleted.

0 commit comments

Comments
 (0)