Skip to content

Commit 63d33fa

Browse files
author
Boris S
committed
SAMZA-1868: Create new SamzaAmdmin for Kafka
This Request is a copy of apache#647(got garbled). This PR already addresses all the comments brought up in the other request. Author: Boris S <bshkolnik@linkedin.com> Author: Boris S <boryas@apache.org> Author: Boris Shkolnik <bshkolni@linkedin.com> Reviewers: Shanthoosh Venkatraman <svenkatr@linkedin.com>, Prateek Maheshwari <pmaheshwari@apache.org> Closes apache#662 from sborya/NewConsumerAdmin2
1 parent 3c78e06 commit 63d33fa

File tree

19 files changed

+1630
-882
lines changed

19 files changed

+1630
-882
lines changed

samza-core/src/main/java/org/apache/samza/application/ApplicationUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,5 +59,4 @@ public static SamzaApplication fromConfig(Config config) {
5959
}
6060
return new LegacyTaskApplication(taskClassOption.get());
6161
}
62-
6362
}

samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
5050

5151
def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
5252

53-
def deleteCommittedMessages(systemName: String) = getOption(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName))
53+
def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false)
5454

5555
/**
5656
* Returns a list of all system names from the config file. Useful for

samza-kafka/src/main/scala/org/apache/samza/config/KafkaConsumerConfig.java renamed to samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.HashMap;
2525
import java.util.Map;
2626
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.commons.lang3.tuple.ImmutablePair;
28+
import org.apache.commons.lang3.tuple.Pair;
2729
import org.apache.kafka.clients.consumer.ConsumerConfig;
2830
import org.apache.kafka.clients.consumer.RangeAssignor;
2931
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -40,35 +42,32 @@ public class KafkaConsumerConfig extends HashMap<String, Object> {
4042

4143
public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class);
4244

43-
static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer";
44-
static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer";
45-
static final String ADMIN_CLIENT_ID_PREFIX = "samza-admin";
45+
public static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
4646

47+
private final String systemName;
4748
/*
4849
* By default, KafkaConsumer will fetch some big number of available messages for all the partitions.
4950
* This may cause memory issues. That's why we will limit the number of messages per partition we get on EACH poll().
5051
*/
5152
static final String DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS = "100";
5253

53-
private KafkaConsumerConfig(Map<String, Object> map) {
54-
super(map);
54+
private KafkaConsumerConfig(Map<String, Object> props, String systemName) {
55+
super(props);
56+
this.systemName = systemName;
5557
}
5658

5759
/**
58-
* Helper method to create configs for use in Kafka consumer.
59-
* The values are based on the "consumer" subset of the configs provided by the app and Samza overrides.
60-
*
61-
* @param config config provided by the app.
62-
* @param systemName system name to get the consumer configuration for.
63-
* @param clientId client id to be used in the Kafka consumer.
60+
* Create kafka consumer configs, based on the subset of global configs.
61+
* @param config application config
62+
* @param systemName system name
63+
* @param clientId client id provided by the caller
6464
* @return KafkaConsumerConfig
6565
*/
6666
public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
6767

6868
Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
6969

70-
//Kafka client configuration
71-
String groupId = getConsumerGroupId(config);
70+
final String groupId = createConsumerGroupId(config);
7271

7372
Map<String, Object> consumerProps = new HashMap<>();
7473
consumerProps.putAll(subConf);
@@ -113,48 +112,42 @@ public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, St
113112
consumerProps.computeIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
114113
(k) -> DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
115114

116-
return new KafkaConsumerConfig(consumerProps);
115+
return new KafkaConsumerConfig(consumerProps, systemName);
117116
}
118117

119-
// group id should be unique per job
120-
static String getConsumerGroupId(Config config) {
121-
JobConfig jobConfig = new JobConfig(config);
122-
Option jobNameOption = jobConfig.getName();
123-
if (jobNameOption.isEmpty()) {
124-
throw new ConfigException("Missing job name");
118+
public String getClientId() {
119+
String clientId = (String) get(ConsumerConfig.CLIENT_ID_CONFIG);
120+
if (StringUtils.isBlank(clientId)) {
121+
throw new SamzaException("client Id is not set for consumer for system=" + systemName);
125122
}
126-
String jobName = (String) jobNameOption.get();
123+
return clientId;
124+
}
127125

128-
String jobId = jobConfig.getJobId();
126+
// group id should be unique per job
127+
static String createConsumerGroupId(Config config) {
128+
Pair<String, String> jobNameId = getJobNameAndId(config);
129129

130-
return String.format("%s-%s", jobName, jobId);
130+
return String.format("%s-%s", jobNameId.getLeft(), jobNameId.getRight());
131131
}
132132

133133
// client id should be unique per job
134-
public static String getConsumerClientId(Config config) {
135-
return getConsumerClientId(CONSUMER_CLIENT_ID_PREFIX, config);
136-
}
137-
138-
public static String getProducerClientId(Config config) {
139-
return getConsumerClientId(PRODUCER_CLIENT_ID_PREFIX, config);
140-
}
134+
public static String createClientId(String prefix, Config config) {
141135

142-
public static String getAdminClientId(Config config) {
143-
return getConsumerClientId(ADMIN_CLIENT_ID_PREFIX, config);
136+
Pair<String, String> jobNameId = getJobNameAndId(config);
137+
String jobName = jobNameId.getLeft();
138+
String jobId = jobNameId.getRight();
139+
return String.format("%s-%s-%s", prefix.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
140+
jobId.replaceAll("\\W", "_"));
144141
}
145142

146-
static String getConsumerClientId(String id, Config config) {
143+
public static Pair<String, String> getJobNameAndId(Config config) {
147144
JobConfig jobConfig = new JobConfig(config);
148145
Option jobNameOption = jobConfig.getName();
149146
if (jobNameOption.isEmpty()) {
150147
throw new ConfigException("Missing job name");
151148
}
152149
String jobName = (String) jobNameOption.get();
153-
154-
String jobId = jobConfig.getJobId();
155-
156-
return String.format("%s-%s-%s", id.replaceAll("\\W", "_"), jobName.replaceAll("\\W", "_"),
157-
jobId.replaceAll("\\W", "_"));
150+
return new ImmutablePair<>(jobName, jobConfig.getJobId());
158151
}
159152

160153
/**

0 commit comments

Comments
 (0)