Skip to content

Commit

Permalink
kafka消费者代码
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhiwubigdata committed Jul 12, 2020
1 parent ea76f1c commit 2860e7e
Showing 1 changed file with 2 additions and 12 deletions.
14 changes: 2 additions & 12 deletions src/main/java/org/myorg/quickstart/shizhan01/KafkaConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,14 @@ public static void main(String[] args) throws Exception {

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
//FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);

//设置从最早的ffset消费
consumer.setStartFromEarliest();
//还可以手动指定相应的 topic, partition,offset,然后从指定好的位置开始消费
//HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
//map.put(new KafkaTopicPartition("test", 1), 10240L);
//假如partition有多个,可以指定每个partition的消费位置
//map.put(new KafkaTopicPartition("test", 2), 10560L);
//然后各个partition从指定位置消费
//consumer.setStartFromSpecificOffsets(map);

env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
@Override
Expand All @@ -50,4 +40,4 @@ public void flatMap(String value, Collector<String> out) throws Exception {

env.execute("start consumer...");
}
}//
}

0 comments on commit 2860e7e

Please sign in to comment.