|
| 1 | +package com.lei.sinktest; |
| 2 | + |
| 3 | +import com.lei.domain.J_SensorReading; |
| 4 | +import com.lei.util.J_MyKafkaUtil; |
| 5 | +import org.apache.flink.streaming.api.datastream.DataStream; |
| 6 | +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; |
| 7 | + |
| 8 | +/** |
| 9 | + * @Author: Lei |
| 10 | + * @E-mail: 843291011@qq.com |
| 11 | + * @Date: 2020-05-22 17:41 |
| 12 | + * @Version: 1.0 |
| 13 | + * @Modified By: |
| 14 | + * @Description: |
| 15 | + */ |
| 16 | +// 4、从Kafka中读取数据 |
| 17 | +/* |
| 18 | +# 查看kafka主题列表 |
| 19 | + kafka-topics --list --zookeeper node-01:2181,node-02:2181,node-03:2181 |
| 20 | +# 创建topic |
| 21 | + kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 3 --partitions 3 --topic flink_topic_test |
| 22 | +# 查看topic |
| 23 | + kafka-topics --zookeeper node-01:2181,node-02:2181,node-03:2181 --topic flink_topic_test --describe |
| 24 | +# 创建生产者 |
| 25 | + kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic flink_topic_test |
| 26 | +# 创建消费者 |
| 27 | + kafka-console-consumer --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic flink_topic_test --group flink_topic_test_g1 --from-beginning |
| 28 | +# 检查消费者消费数据情况 |
| 29 | + kafka-consumer-groups --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --describe --group flink_topic_test_g1 |
| 30 | +# 列出所有消费者的情况 |
| 31 | + kafka-consumer-groups --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --list |
| 32 | +# 修改topic分区 |
| 33 | + kafka-topics --zookeeper node-01:2181,node-02:2181,node-03:2181 --alter --topic flink_topic_test --partitions 4 |
| 34 | +# 删除消费者组 |
| 35 | + kafka-consumer-groups --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --delete --group flink_topic_test_g1 |
| 36 | +# 删除topic |
| 37 | + kafka-topics --delete --zookeeper node-01:2181,node-02:2181,node-03:2181 --topic flink_topic_test |
| 38 | +
|
| 39 | + */ |
| 40 | +public class J01_KafkaSinkTest_DianShang3C { |
| 41 | + public static void main(String[] args) throws Exception { |
| 42 | + |
| 43 | + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| 44 | + env.setParallelism(1); |
| 45 | + |
| 46 | + // source |
| 47 | + DataStream<String> inputStream = env.readTextFile("input_dir/dianshang_3c.txt"); |
| 48 | + //DataStream<String> inputStream = env.addSource(J_MyKafkaUtil.getConsumer("flink_topic_test")); |
| 49 | + |
| 50 | + // Transform操作 |
| 51 | + /* |
| 52 | + 数据源示例:sensor_1, 1547718205, 27.1 |
| 53 | + 数据源示例:sensor_2, 1547718206, 28.1 |
| 54 | + 数据源示例:sensor_3, 1547718207, 29.1 |
| 55 | + */ |
| 56 | +// DataStream<String> dataStream = inputStream.map(data -> { |
| 57 | +// String[] dataArray = data.split(","); |
| 58 | +// // 转成String 方便序列化输出 |
| 59 | +// return new J_SensorReading(dataArray[0].trim(), Long.valueOf(dataArray[1].trim()), Double.valueOf(dataArray[2].trim())).toString(); |
| 60 | +// }); |
| 61 | + |
| 62 | + // sink |
| 63 | + // dataStream.addSink(new FlinkKafkaProducer011[String]("node-01:9092", "gmall", new SimpleStringSchema())) |
| 64 | + inputStream.addSink(J_MyKafkaUtil.getProducer("dianshang_3c")); // topic_news |
| 65 | + /* |
| 66 | + # 创建topic |
| 67 | + kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 3 --partitions 3 --topic flink_topic_test |
| 68 | + # 查看topic |
| 69 | + kafka-topics --zookeeper node-01:2181,node-02:2181,node-03:2181 --topic flink_topic_test --describe |
| 70 | + # 创建生产者 |
| 71 | + kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic flink_topic_test |
| 72 | + # 创建消费者 |
| 73 | + kafka-console-consumer --bootstrap-server node-01:9092,node-02:9092,node-03:9092 --topic flink_topic_test --group flink_topic_test_g1 --from-beginning |
| 74 | + */ |
| 75 | + inputStream.print(); |
| 76 | + |
| 77 | + env.execute("kafka sink test"); |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +//class SensorReading { |
| 82 | +// String id; |
| 83 | +// Long timestamp; |
| 84 | +// Double temperature; |
| 85 | +// |
| 86 | +// public SensorReading(String id, Long timestamp, Double temperature) { |
| 87 | +// this.id = id; |
| 88 | +// this.timestamp = timestamp; |
| 89 | +// this.temperature = temperature; |
| 90 | +// } |
| 91 | +// |
| 92 | +// @Override |
| 93 | +// public String toString() { |
| 94 | +// return "J_SensorReading{" + |
| 95 | +// "id='" + id + '\'' + |
| 96 | +// ", timestamp=" + timestamp + |
| 97 | +// ", temperature=" + temperature + |
| 98 | +// '}'; |
| 99 | +// } |
| 100 | +//} |
0 commit comments