Skip to content

Commit

Permalink
redis sink
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhiwubigdata committed Jul 19, 2020
1 parent 6182e2d commit 41582bf
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
32 changes: 32 additions & 0 deletions src/main/java/org/myorg/quickstart/RedisSink27/RedisConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.myorg.quickstart.RedisSink27;


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashSet;

public class RedisConnector {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, String>> stream = env.fromElements("Flink","Spark","Storm").map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return new Tuple2<>(s, s+"_sink2");
}
});

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
stream.addSink(new RedisSink<>(conf, new RedisSink02()));
env.execute("redis sink01");
}
}
34 changes: 34 additions & 0 deletions src/main/java/org/myorg/quickstart/RedisSink27/RedisSink01.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.myorg.quickstart.RedisSink27;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class RedisSink01 implements RedisMapper<Tuple2<String, String>>{

/**
* 设置redis数据类型
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}

/**
* 设置Key
*/
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}

/**
* 设置value
*/
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
34 changes: 34 additions & 0 deletions src/main/java/org/myorg/quickstart/RedisSink27/RedisSink02.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.myorg.quickstart.RedisSink27;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class RedisSink02 implements RedisMapper<Tuple2<String, String>> {

/**
* 设置redis数据类型
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}

/**
* 设置Key
*/
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}

/**
* 设置value
*/
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/myorg/quickstart/RedisSink27/SelfRedisSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.myorg.quickstart.RedisSink27;


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class SelfRedisSink extends RichSinkFunction {


private transient Jedis jedis;

public void open(Configuration config) {
jedis = new Jedis("localhost", 6379);
}

public void invoke(Tuple2<String, String> value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
jedis.set(value.f0, value.f1);
}

@Override
public void close() throws Exception {
jedis.close();
}

}

0 comments on commit 41582bf

Please sign in to comment.