diff --git a/src/main/java/org/myorg/quickstart/RedisSink27/RedisConnector.java b/src/main/java/org/myorg/quickstart/RedisSink27/RedisConnector.java new file mode 100644 index 0000000..05eafdd --- /dev/null +++ b/src/main/java/org/myorg/quickstart/RedisSink27/RedisConnector.java @@ -0,0 +1,32 @@ +package org.myorg.quickstart.RedisSink27; + + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.redis.RedisSink; +import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; + +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashSet; + +public class RedisConnector { + + public static void main(String[] args) throws Exception{ + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream> stream = env.fromElements("Flink","Spark","Storm").map(new MapFunction>() { + @Override + public Tuple2 map(String s) throws Exception { + return new Tuple2<>(s, s+"_sink2"); + } + }); + + FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build(); + stream.addSink(new RedisSink<>(conf, new RedisSink02())); + env.execute("redis sink01"); + } +} diff --git a/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink01.java b/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink01.java new file mode 100644 index 0000000..b1fe12a --- /dev/null +++ b/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink01.java @@ -0,0 +1,34 @@ +package org.myorg.quickstart.RedisSink27; + + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +public class RedisSink01 implements RedisMapper>{ + + /** + * 设置redis数据类型 + */ + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.SET); + } + + /** + * 设置Key + */ + @Override + public String getKeyFromData(Tuple2 data) { + return data.f0; + } + + /** + * 设置value + */ + @Override + public String getValueFromData(Tuple2 data) { + return data.f1; + } +} diff --git a/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink02.java b/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink02.java new file mode 100644 index 0000000..ba6d633 --- /dev/null +++ b/src/main/java/org/myorg/quickstart/RedisSink27/RedisSink02.java @@ -0,0 +1,34 @@ +package org.myorg.quickstart.RedisSink27; + + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; +import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; + +public class RedisSink02 implements RedisMapper> { + + /** + * 设置redis数据类型 + */ + @Override + public RedisCommandDescription getCommandDescription() { + return new RedisCommandDescription(RedisCommand.SET); + } + + /** + * 设置Key + */ + @Override + public String getKeyFromData(Tuple2 data) { + return data.f0; + } + + /** + * 设置value + */ + @Override + public String getValueFromData(Tuple2 data) { + return data.f1; + } +} diff --git a/src/main/java/org/myorg/quickstart/RedisSink27/SelfRedisSink.java b/src/main/java/org/myorg/quickstart/RedisSink27/SelfRedisSink.java new file mode 100644 index 0000000..7815148 --- /dev/null +++ b/src/main/java/org/myorg/quickstart/RedisSink27/SelfRedisSink.java @@ -0,0 +1,30 @@ +package org.myorg.quickstart.RedisSink27; + + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import redis.clients.jedis.Jedis; + +public class SelfRedisSink extends RichSinkFunction { + + + private transient Jedis jedis; + + public void open(Configuration config) { + jedis = new Jedis("localhost", 6379); + } + + public void invoke(Tuple2 value, Context context) throws Exception { + if (!jedis.isConnected()) { + jedis.connect(); + } + jedis.set(value.f0, value.f1); + } + + @Override + public void close() throws Exception { + jedis.close(); + } + +}