Skip to content

Commit

Permalink
维表关联
Browse files Browse the repository at this point in the history
  • Loading branch information
wangzhiwubigdata committed Jun 17, 2020
1 parent a56fc80 commit d5dc3fc
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 0 deletions.
48 changes: 48 additions & 0 deletions src/main/java/org/myorg/quickstart/Dim19/DimSync.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.myorg.quickstart.Dim19;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DimSync extends RichMapFunction<String,Order> {

private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);

private Connection conn = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
}

public Order map(String in) throws Exception {

JSONObject jsonObject = JSONObject.parseObject(in);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");

//根据city_id 查询 city_name
PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
pst.setInt(1,cityId);
ResultSet resultSet = pst.executeQuery();
String cityName = null;
while (resultSet.next()){
cityName = resultSet.getString(1);
}
pst.close();
return new Order(cityId,userName,items,cityName);
}

public void close() throws Exception {
super.close();
conn.close();
}

}//
75 changes: 75 additions & 0 deletions src/main/java/org/myorg/quickstart/Dim19/LRU.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.myorg.quickstart.Dim19;

import com.alibaba.fastjson.JSONObject;
import com.stumbleupon.async.Callback;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class LRU extends RichAsyncFunction<String,Order> {

private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
String table = "info";
Cache<String, String> cache = null;
private HBaseClient client = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建hbase客户端
client = new HBaseClient("127.0.0.1","7071");
cache = CacheBuilder.newBuilder()
//最多存储10000条
.maximumSize(10000)
//过期时间为1分钟
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}

@Override
public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {

JSONObject jsonObject = JSONObject.parseObject(input);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//读缓存
String cacheCityName = cache.getIfPresent(cityId);
//如果缓存获取失败再从hbase获取维度数据
if(cacheCityName != null){
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(cacheCityName);
resultFuture.complete(Collections.singleton(order));
}else {

client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
for (KeyValue kv : arg) {
String value = new String(kv.value());
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(value);
resultFuture.complete(Collections.singleton(order));
cache.put(String.valueOf(cityId), value);
}
return null;
});

}
}

}
60 changes: 60 additions & 0 deletions src/main/java/org/myorg/quickstart/Dim19/Order.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.myorg.quickstart.Dim19;

public class Order {
private Integer cityId;
private String userName;
private String items;
private String cityName;

public Order(Integer cityId, String userName, String items, String cityName) {
this.cityId = cityId;
this.userName = userName;
this.items = items;
this.cityName = cityName;
}

public Order() {
}

public Integer getCityId() {
return cityId;
}

public void setCityId(Integer cityId) {
this.cityId = cityId;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getItems() {
return items;
}

public void setItems(String items) {
this.items = items;
}

public String getCityName() {
return cityName;
}

public void setCityName(String cityName) {
this.cityName = cityName;
}

@Override
public String toString() {
return "Order{" +
"cityId=" + cityId +
", userName='" + userName + '\'' +
", items='" + items + '\'' +
", cityName='" + cityName + '\'' +
'}';
}
}
59 changes: 59 additions & 0 deletions src/main/java/org/myorg/quickstart/Dim19/WholeLoad.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.myorg.quickstart.Dim19;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class WholeLoad extends RichMapFunction<String,Order> {


private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
ScheduledExecutorService executor = null;
private Map<String,String> cache;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
e.printStackTrace();
}
}
},5,5, TimeUnit.MINUTES);
}

@Override
public Order map(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
String cityName = cache.get(cityId);
return new Order(cityId,userName,items,cityName);
}

public void load() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
ResultSet rs = statement.executeQuery();
//全量更新维度数据到内存
while (rs.next()) {
String cityId = rs.getString("city_id");
String cityName = rs.getString("city_name");
cache.put(cityId, cityName);
}
con.close();
}
}

0 comments on commit d5dc3fc

Please sign in to comment.