Skip to content

Commit

Permalink
KYLIN-4115 Always load kafkaConsumerProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzhx authored and nichunen committed Jul 29, 2019
1 parent 32044d9 commit ed20926
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -45,6 +46,7 @@
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.hive.HiveMetadataExplorer;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.config.KafkaConsumerProperties;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,7 +108,8 @@ public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, So
.getKafkaConfig(cube.getRootFactTable());
final String brokers = KafkaClient.getKafkaBrokers(kafkaConfig);
final String topic = kafkaConfig.getTopic();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) {
Properties property = KafkaConsumerProperties.getInstanceFromEnv().extractKafkaConfigToProperties();
try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), property)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
logger.info("Get {} partitions for topic {} ", partitionInfos.size(), topic);
for (PartitionInfo partitionInfo : partitionInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ public static KafkaConsumer getKafkaConsumer(String brokers, String consumerGrou

private static Properties constructDefaultKafkaConsumerProperties(String brokers, String consumerGroup, Properties properties) {
Properties props = new Properties();
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
props.put("bootstrap.servers", brokers);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", consumerGroup);
props.put("enable.auto.commit", "false");
if (properties != null) {
for (Map.Entry entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
}
return props;
}

Expand Down

0 comments on commit ed20926

Please sign in to comment.